ua_client_subscriptions.c 50 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include <open62541/client_highlevel.h>
  14. #include <open62541/client_highlevel_async.h>
  15. #include "ua_client_internal.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. static UA_StatusCode
  21. __Subscriptions_create_prepare(
  22. CustomCallback *cc, const UA_CreateSubscriptionRequest *request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_Client_Subscription *sub =
  27. (UA_Client_Subscription *)(cc->clientData =
  28. UA_malloc(sizeof(UA_Client_Subscription)));
  29. if(!sub)
  30. return UA_STATUSCODE_BADOUTOFMEMORY;
  31. sub->context = subscriptionContext;
  32. sub->statusChangeCallback = statusChangeCallback;
  33. sub->deleteCallback = deleteCallback;
  34. return UA_STATUSCODE_GOOD;
  35. }
  36. static void
  37. __Subscriptions_create_handler(UA_Client *client, void *data, UA_UInt32 requestId,
  38. void *r) {
  39. UA_CreateSubscriptionResponse *response = (UA_CreateSubscriptionResponse *)r;
  40. CustomCallback *cc = (CustomCallback *)data;
  41. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  42. goto cleanup;
  43. }
  44. UA_Client_Subscription *newSub = (UA_Client_Subscription *)cc->clientData;
  45. cc->clientData = NULL;
  46. /* Prepare the internal representation */
  47. newSub->subscriptionId = response->subscriptionId;
  48. newSub->sequenceNumber = 0;
  49. newSub->lastActivity = UA_DateTime_nowMonotonic();
  50. newSub->publishingInterval = response->revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response->revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. cleanup:
  55. if(cc->clientData)
  56. UA_free(cc->clientData);
  57. if(cc->isAsync) {
  58. if(cc->userCallback)
  59. cc->userCallback(client, cc->userData, requestId, response);
  60. UA_free(cc);
  61. }
  62. }
  63. UA_CreateSubscriptionResponse
  64. UA_Client_Subscriptions_create(UA_Client *client,
  65. const UA_CreateSubscriptionRequest request,
  66. void *subscriptionContext,
  67. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  68. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  69. UA_CreateSubscriptionResponse response;
  70. UA_CreateSubscriptionResponse_init(&response);
  71. CustomCallback cc;
  72. memset(&cc, 0, sizeof(CustomCallback));
  73. #ifdef __clang_analyzer__
  74. cc.isAsync = false;
  75. #endif
  76. UA_StatusCode retval = __Subscriptions_create_prepare(
  77. &cc, &request, subscriptionContext, statusChangeCallback, deleteCallback);
  78. if(retval != UA_STATUSCODE_GOOD) {
  79. response.responseHeader.serviceResult = retval;
  80. return response;
  81. }
  82. /* Send the request as a synchronous service call */
  83. __UA_Client_Service(client,
  84. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  85. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  86. __Subscriptions_create_handler(client, &cc, 0, &response);
  87. return response;
  88. }
  89. UA_StatusCode
  90. UA_Client_Subscriptions_create_async(
  91. UA_Client *client, const UA_CreateSubscriptionRequest request,
  92. void *subscriptionContext,
  93. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  94. UA_Client_DeleteSubscriptionCallback deleteCallback,
  95. UA_ClientAsyncServiceCallback createCallback, void *userdata, UA_UInt32 *requestId) {
  96. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  97. CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
  98. if(!cc) {
  99. return UA_STATUSCODE_BADOUTOFMEMORY;
  100. }
  101. cc->isAsync = true;
  102. cc->userCallback = createCallback;
  103. cc->userData = userdata;
  104. retval = __Subscriptions_create_prepare(cc, &request, subscriptionContext,
  105. statusChangeCallback, deleteCallback);
  106. if(retval != UA_STATUSCODE_GOOD) {
  107. goto cleanup;
  108. }
  109. /* Send the request as asynchronous service call */
  110. return __UA_Client_AsyncService(
  111. client, &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  112. __Subscriptions_create_handler, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE],
  113. cc, requestId);
  114. cleanup:
  115. if(cc->clientData)
  116. UA_free(cc->clientData);
  117. UA_free(cc);
  118. return retval;
  119. }
  120. static UA_Client_Subscription *
  121. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  122. UA_Client_Subscription *sub = NULL;
  123. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  124. if(sub->subscriptionId == subscriptionId)
  125. break;
  126. }
  127. return sub;
  128. }
  129. static void
  130. __Subscriptions_modify_handler(UA_Client *client, void *data, UA_UInt32 requestId,
  131. void *r) {
  132. UA_ModifySubscriptionResponse *response = (UA_ModifySubscriptionResponse *)r;
  133. CustomCallback *cc = (CustomCallback *)data;
  134. UA_Client_Subscription *sub = (UA_Client_Subscription *)cc->clientData;
  135. sub->publishingInterval = response->revisedPublishingInterval;
  136. sub->maxKeepAliveCount = response->revisedMaxKeepAliveCount;
  137. if(cc->isAsync) {
  138. if(cc->userCallback)
  139. cc->userCallback(client, cc->userData, requestId, response);
  140. UA_free(cc);
  141. }
  142. }
  143. UA_ModifySubscriptionResponse
  144. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  145. UA_ModifySubscriptionResponse response;
  146. UA_ModifySubscriptionResponse_init(&response);
  147. /* Find the internal representation */
  148. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  149. if(!sub) {
  150. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  151. return response;
  152. }
  153. /* Call the service */
  154. __UA_Client_Service(client,
  155. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  156. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  157. /* Adjust the internal representation */
  158. sub->publishingInterval = response.revisedPublishingInterval;
  159. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  160. return response;
  161. }
  162. UA_StatusCode
  163. UA_Client_Subscriptions_modify_async(UA_Client *client,
  164. const UA_ModifySubscriptionRequest request,
  165. UA_ClientAsyncServiceCallback callback,
  166. void *userdata, UA_UInt32 *requestId) {
  167. /* Find the internal representation */
  168. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  169. if(!sub)
  170. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  171. CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
  172. if(!cc)
  173. return UA_STATUSCODE_BADOUTOFMEMORY;
  174. cc->isAsync = true;
  175. cc->clientData = sub;
  176. cc->userData = userdata;
  177. cc->userCallback = callback;
  178. return __UA_Client_AsyncService(
  179. client, &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  180. __Subscriptions_modify_handler, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE],
  181. cc, requestId);
  182. }
  183. static void
  184. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  185. /* Remove the MonitoredItems */
  186. UA_Client_MonitoredItem *mon, *mon_tmp;
  187. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  188. UA_Client_MonitoredItem_remove(client, sub, mon);
  189. /* Call the delete callback */
  190. if(sub->deleteCallback)
  191. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  192. /* Remove */
  193. LIST_REMOVE(sub, listEntry);
  194. UA_free(sub);
  195. }
  196. typedef struct {
  197. UA_DeleteSubscriptionsRequest *request;
  198. UA_Client_Subscription **subs;
  199. } Subscriptions_DeleteData;
  200. static void
  201. __Subscriptions_DeleteData_free(Subscriptions_DeleteData *data) {
  202. if(!data)
  203. return;
  204. if(data->subs)
  205. UA_free(data->subs);
  206. if(data->request)
  207. UA_delete(data->request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST]);
  208. UA_free(data);
  209. }
  210. static UA_INLINE void
  211. __Subscriptions_delete_prepare(UA_Client *client, Subscriptions_DeleteData *data) {
  212. /* temporary remove the subscriptions from the list */
  213. for(size_t i = 0; i < data->request->subscriptionIdsSize; i++) {
  214. data->subs[i] = findSubscription(client, data->request->subscriptionIds[i]);
  215. if(data->subs[i])
  216. LIST_REMOVE(data->subs[i], listEntry);
  217. }
  218. }
  219. static void
  220. __Subscriptions_delete_handler(UA_Client *client, void *data, UA_UInt32 requestId,
  221. void *r) {
  222. UA_DeleteSubscriptionsResponse *response = (UA_DeleteSubscriptionsResponse *)r;
  223. CustomCallback *cc = (CustomCallback *)data;
  224. Subscriptions_DeleteData *delData = (Subscriptions_DeleteData *)cc->clientData;
  225. UA_DeleteSubscriptionsRequest *request = delData->request;
  226. UA_Client_Subscription **subs = delData->subs;
  227. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  228. goto cleanup;
  229. if(request->subscriptionIdsSize != response->resultsSize) {
  230. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  231. goto cleanup;
  232. }
  233. /* Loop over the removed subscriptions and remove internally */
  234. for(size_t i = 0; i < request->subscriptionIdsSize; i++) {
  235. if(response->results[i] != UA_STATUSCODE_GOOD &&
  236. response->results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  237. /* Something was wrong, reinsert the subscription in the list */
  238. if (subs[i])
  239. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  240. continue;
  241. }
  242. if(!subs[i]) {
  243. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  244. "No internal representation of subscription %u",
  245. delData->request->subscriptionIds[i]);
  246. continue;
  247. }
  248. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  249. UA_Client_Subscription_deleteInternal(client, subs[i]);
  250. }
  251. cleanup:
  252. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  253. for(size_t i = 0; i < request->subscriptionIdsSize; i++) {
  254. if(subs[i]) {
  255. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  256. }
  257. }
  258. }
  259. if(cc->isAsync) {
  260. if(cc->userCallback)
  261. cc->userCallback(client, cc->userData, requestId, response);
  262. __Subscriptions_DeleteData_free(delData);
  263. UA_free(cc);
  264. }
  265. }
  266. UA_StatusCode
  267. UA_Client_Subscriptions_delete_async(UA_Client *client,
  268. const UA_DeleteSubscriptionsRequest request,
  269. UA_ClientAsyncServiceCallback callback,
  270. void *userdata, UA_UInt32 *requestId) {
  271. CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
  272. if(!cc)
  273. return UA_STATUSCODE_BADOUTOFMEMORY;
  274. Subscriptions_DeleteData *data =
  275. (Subscriptions_DeleteData *)UA_calloc(1, sizeof(Subscriptions_DeleteData));
  276. if(cc->clientData)
  277. goto cleanup;
  278. cc->clientData = data;
  279. data->subs = (UA_Client_Subscription **)UA_calloc(request.subscriptionIdsSize,
  280. sizeof(UA_Client_Subscription *));
  281. if(!data->subs)
  282. goto cleanup;
  283. data->request = UA_DeleteSubscriptionsRequest_new();
  284. if(!data->request)
  285. goto cleanup;
  286. UA_DeleteSubscriptionsRequest_copy(&request, data->request);
  287. __Subscriptions_delete_prepare(client, data);
  288. cc->isAsync = true;
  289. cc->userCallback = callback;
  290. cc->userData = userdata;
  291. return __UA_Client_AsyncService(
  292. client, &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  293. __Subscriptions_delete_handler, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE],
  294. cc, requestId);
  295. cleanup:
  296. __Subscriptions_DeleteData_free(data);
  297. UA_free(cc);
  298. return UA_STATUSCODE_BADOUTOFMEMORY;
  299. }
  300. UA_DeleteSubscriptionsResponse
  301. UA_Client_Subscriptions_delete(UA_Client *client,
  302. const UA_DeleteSubscriptionsRequest request) {
  303. UA_STACKARRAY(UA_Client_Subscription *, subs, request.subscriptionIdsSize);
  304. memset(subs, 0, sizeof(void *) * request.subscriptionIdsSize);
  305. CustomCallback cc;
  306. memset(&cc, 0, sizeof(CustomCallback));
  307. #ifdef __clang_analyzer__
  308. cc.isAsync = false;
  309. #endif
  310. Subscriptions_DeleteData data;
  311. cc.clientData = &data;
  312. data.request = (UA_DeleteSubscriptionsRequest *)(uintptr_t)&request;
  313. data.subs = subs;
  314. __Subscriptions_delete_prepare(client, &data);
  315. /* Send the request */
  316. UA_DeleteSubscriptionsResponse response;
  317. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  318. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  319. __Subscriptions_delete_handler(client, &cc, 0, &response);
  320. return response;
  321. }
  322. UA_StatusCode
  323. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  324. UA_DeleteSubscriptionsRequest request;
  325. UA_DeleteSubscriptionsRequest_init(&request);
  326. request.subscriptionIds = &subscriptionId;
  327. request.subscriptionIdsSize = 1;
  328. UA_DeleteSubscriptionsResponse response =
  329. UA_Client_Subscriptions_delete(client, request);
  330. UA_StatusCode retval = response.responseHeader.serviceResult;
  331. if(retval != UA_STATUSCODE_GOOD) {
  332. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  333. return retval;
  334. }
  335. if(response.resultsSize != 1) {
  336. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  337. return UA_STATUSCODE_BADINTERNALERROR;
  338. }
  339. retval = response.results[0];
  340. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  341. return retval;
  342. }
  343. /******************/
  344. /* MonitoredItems */
  345. /******************/
  346. void
  347. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  348. UA_Client_MonitoredItem *mon) {
  349. // NOLINTNEXTLINE
  350. LIST_REMOVE(mon, listEntry);
  351. if(mon->deleteCallback)
  352. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  353. mon->monitoredItemId, mon->context);
  354. UA_free(mon);
  355. }
  356. typedef struct {
  357. UA_Client_Subscription *sub;
  358. UA_Client_MonitoredItem **mis;
  359. void **contexts;
  360. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks;
  361. void **handlingCallbacks;
  362. UA_CreateMonitoredItemsRequest *request;
  363. } MonitoredItems_CreateData;
  364. static void
  365. MonitoredItems_CreateData_deleteItems(MonitoredItems_CreateData *data,
  366. UA_Client *client) {
  367. if(!data)
  368. return;
  369. #ifdef __clang_analyzer__
  370. /* The clang analyzer requires the information that the loop below is executed
  371. which is already checked in the __UA_Client_MonitoredItems_create */
  372. assert(data->request->itemsToCreateSize);
  373. #endif
  374. bool hasCallbacks = data->deleteCallbacks != NULL && data->contexts != NULL;
  375. if(data->request && data->mis) {
  376. for(size_t i = 0; i < data->request->itemsToCreateSize; i++) {
  377. if(data->mis[i]) {
  378. if(hasCallbacks && data->deleteCallbacks[i]) {
  379. if(data->sub)
  380. data->deleteCallbacks[i](client, data->sub->subscriptionId,
  381. data->sub->context, 0,
  382. data->contexts[i]);
  383. else
  384. data->deleteCallbacks[i](client, 0, NULL, 0, data->contexts[i]);
  385. }
  386. UA_free(data->mis[i]);
  387. }
  388. }
  389. }
  390. }
  391. static void
  392. MonitoredItems_CreateData_free(MonitoredItems_CreateData *data) {
  393. if(!data)
  394. return;
  395. /* contains contexts, deleteCallbacs, handlingCallbacks as well */
  396. if(data->mis)
  397. UA_free(data->mis);
  398. if(data->request)
  399. UA_CreateMonitoredItemsRequest_delete(data->request);
  400. UA_free(data);
  401. }
  402. static void
  403. __MonitoredItems_create_handler(UA_Client *client, void *d, UA_UInt32 requestId,
  404. void *r) {
  405. UA_CreateMonitoredItemsResponse *response = (UA_CreateMonitoredItemsResponse *)r;
  406. CustomCallback *cc = (CustomCallback *)d;
  407. MonitoredItems_CreateData *data = (MonitoredItems_CreateData *)cc->clientData;
  408. // introduce local pointers to the variables/parameters in the CreateData
  409. // to keep the code completely intact
  410. UA_CreateMonitoredItemsRequest *request = data->request;
  411. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks = data->deleteCallbacks;
  412. UA_Client_Subscription *sub = data->sub;
  413. void **contexts = data->contexts;
  414. UA_Client_MonitoredItem **mis = data->mis;
  415. void **handlingCallbacks = data->handlingCallbacks;
  416. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  417. goto cleanup;
  418. if(response->resultsSize != request->itemsToCreateSize) {
  419. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  420. goto cleanup;
  421. }
  422. /* Add internally */
  423. for(size_t i = 0; i < request->itemsToCreateSize; i++) {
  424. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  425. if(deleteCallbacks[i])
  426. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  427. UA_free(mis[i]);
  428. mis[i] = NULL;
  429. continue;
  430. }
  431. UA_assert(mis[i] != NULL);
  432. UA_Client_MonitoredItem *newMon = mis[i];
  433. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  434. newMon->monitoredItemId = response->results[i].monitoredItemId;
  435. newMon->context = contexts[i];
  436. newMon->deleteCallback = deleteCallbacks[i];
  437. newMon->handler.dataChangeCallback =
  438. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  439. newMon->isEventMonitoredItem =
  440. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  441. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  442. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  443. "Subscription %u | Added a MonitoredItem with handle %u",
  444. sub->subscriptionId, newMon->clientHandle);
  445. mis[i] = NULL;
  446. }
  447. cleanup:
  448. MonitoredItems_CreateData_deleteItems(data, client);
  449. if(cc->isAsync) {
  450. if(cc->userCallback)
  451. cc->userCallback(client, cc->userData, requestId, response);
  452. MonitoredItems_CreateData_free(data);
  453. UA_free(cc);
  454. }
  455. }
  456. static UA_StatusCode
  457. MonitoredItems_CreateData_prepare(MonitoredItems_CreateData *data, UA_Client *client) {
  458. /* Allocate the memory for internal representations */
  459. for(size_t i = 0; i < data->request->itemsToCreateSize; i++) {
  460. data->mis[i] =
  461. (UA_Client_MonitoredItem *)UA_malloc(sizeof(UA_Client_MonitoredItem));
  462. if(!data->mis[i]) {
  463. return UA_STATUSCODE_BADOUTOFMEMORY;
  464. }
  465. }
  466. /* Set the clientHandle */
  467. for(size_t i = 0; i < data->request->itemsToCreateSize; i++)
  468. data->request->itemsToCreate[i].requestedParameters.clientHandle =
  469. ++(client->monitoredItemHandles);
  470. return UA_STATUSCODE_GOOD;
  471. }
  472. static void
  473. __UA_Client_MonitoredItems_create(UA_Client *client,
  474. const UA_CreateMonitoredItemsRequest *request,
  475. void **contexts, void **handlingCallbacks,
  476. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  477. UA_CreateMonitoredItemsResponse *response) {
  478. UA_CreateMonitoredItemsResponse_init(response);
  479. if(!request->itemsToCreateSize) {
  480. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  481. return;
  482. }
  483. CustomCallback cc;
  484. memset(&cc, 0, sizeof(CustomCallback));
  485. #ifdef __clang_analyzer__
  486. cc.isAsync = false;
  487. #endif
  488. /* Fix clang warning */
  489. size_t itemsToCreateSize = request->itemsToCreateSize;
  490. UA_STACKARRAY(UA_Client_MonitoredItem *, mis, itemsToCreateSize);
  491. memset(mis, 0, sizeof(void *) * itemsToCreateSize);
  492. MonitoredItems_CreateData data;
  493. memset(&data, 0, sizeof(MonitoredItems_CreateData));
  494. data.request = (UA_CreateMonitoredItemsRequest *)(uintptr_t)request;
  495. data.contexts = contexts;
  496. data.handlingCallbacks = handlingCallbacks;
  497. data.deleteCallbacks = deleteCallbacks;
  498. data.mis = mis;
  499. cc.clientData = &data;
  500. /* Get the subscription */
  501. data.sub = findSubscription(client, request->subscriptionId);
  502. if(!data.sub) {
  503. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  504. goto cleanup;
  505. }
  506. UA_StatusCode retval = MonitoredItems_CreateData_prepare(&data, client);
  507. if(retval != UA_STATUSCODE_GOOD) {
  508. response->responseHeader.serviceResult = retval;
  509. goto cleanup;
  510. }
  511. /* Call the service */
  512. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  513. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  514. __MonitoredItems_create_handler(client, &cc, 0, response);
  515. return;
  516. cleanup:
  517. MonitoredItems_CreateData_deleteItems(&data, client);
  518. }
  519. static UA_StatusCode
  520. __UA_Client_MonitoredItems_createDataChanges_async(
  521. UA_Client *client, const UA_CreateMonitoredItemsRequest request, void **contexts,
  522. void **callbacks, UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  523. UA_ClientAsyncServiceCallback createCallback, void *userdata, UA_UInt32 *requestId) {
  524. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  525. CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
  526. if(!cc)
  527. return UA_STATUSCODE_BADOUTOFMEMORY;
  528. cc->userCallback = createCallback;
  529. cc->userData = userdata;
  530. MonitoredItems_CreateData *data =
  531. (MonitoredItems_CreateData *)UA_calloc(1, sizeof(MonitoredItems_CreateData));
  532. if(!data) {
  533. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  534. goto cleanup;
  535. }
  536. cc->isAsync = true;
  537. cc->clientData = data;
  538. data->sub = findSubscription(client, request.subscriptionId);
  539. if(!data->sub) {
  540. retval = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  541. goto cleanup;
  542. }
  543. /* create a big array that holds the monitored items and parameters */
  544. void **array = (void **)UA_calloc(4 * request.itemsToCreateSize, sizeof(void *));
  545. if(!array) {
  546. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  547. goto cleanup;
  548. }
  549. data->mis = (UA_Client_MonitoredItem **)array;
  550. data->contexts =
  551. (void **)((uintptr_t)array + (sizeof(void *) * request.itemsToCreateSize));
  552. memcpy(data->contexts, contexts, request.itemsToCreateSize * sizeof(void *));
  553. data->deleteCallbacks =
  554. (UA_Client_DeleteMonitoredItemCallback *)((uintptr_t)array +
  555. (2 * request.itemsToCreateSize *
  556. sizeof(void *)));
  557. memcpy(data->deleteCallbacks, deleteCallbacks,
  558. request.itemsToCreateSize * sizeof(UA_Client_DeleteMonitoredItemCallback));
  559. data->handlingCallbacks =
  560. (void **)((uintptr_t)array + (3 * request.itemsToCreateSize * sizeof(void *)));
  561. memcpy(data->handlingCallbacks, callbacks,
  562. request.itemsToCreateSize * sizeof(void *));
  563. data->request = UA_CreateMonitoredItemsRequest_new();
  564. if(!data->request) {
  565. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  566. goto cleanup;
  567. }
  568. retval = UA_CreateMonitoredItemsRequest_copy(&request, data->request);
  569. if(retval != UA_STATUSCODE_GOOD)
  570. goto cleanup;
  571. retval = MonitoredItems_CreateData_prepare(data, client);
  572. if(retval != UA_STATUSCODE_GOOD)
  573. goto cleanup;
  574. return __UA_Client_AsyncService(
  575. client, data->request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  576. __MonitoredItems_create_handler, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE],
  577. cc, requestId);
  578. cleanup:
  579. MonitoredItems_CreateData_deleteItems(data, client);
  580. MonitoredItems_CreateData_free(data);
  581. UA_free(cc);
  582. return retval;
  583. }
  584. UA_CreateMonitoredItemsResponse
  585. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  586. const UA_CreateMonitoredItemsRequest request, void **contexts,
  587. UA_Client_DataChangeNotificationCallback *callbacks,
  588. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  589. UA_CreateMonitoredItemsResponse response;
  590. __UA_Client_MonitoredItems_create(client, &request, contexts,
  591. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  592. return response;
  593. }
  594. UA_StatusCode
  595. UA_Client_MonitoredItems_createDataChanges_async(
  596. UA_Client *client, const UA_CreateMonitoredItemsRequest request, void **contexts,
  597. UA_Client_DataChangeNotificationCallback *callbacks,
  598. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  599. UA_ClientAsyncServiceCallback createCallback, void *userdata, UA_UInt32 *requestId) {
  600. return __UA_Client_MonitoredItems_createDataChanges_async(
  601. client, request, contexts, (void **)(uintptr_t)callbacks, deleteCallbacks,
  602. createCallback, userdata, requestId);
  603. }
  604. UA_MonitoredItemCreateResult
  605. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  606. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  607. void *context, UA_Client_DataChangeNotificationCallback callback,
  608. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  609. UA_CreateMonitoredItemsRequest request;
  610. UA_CreateMonitoredItemsRequest_init(&request);
  611. request.subscriptionId = subscriptionId;
  612. request.timestampsToReturn = timestampsToReturn;
  613. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  614. request.itemsToCreateSize = 1;
  615. UA_CreateMonitoredItemsResponse response =
  616. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  617. &callback, &deleteCallback);
  618. UA_MonitoredItemCreateResult result;
  619. UA_MonitoredItemCreateResult_init(&result);
  620. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  621. result.statusCode = response.responseHeader.serviceResult;
  622. if(result.statusCode == UA_STATUSCODE_GOOD &&
  623. response.resultsSize != 1)
  624. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  625. if(result.statusCode == UA_STATUSCODE_GOOD)
  626. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  627. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  628. return result;
  629. }
  630. UA_CreateMonitoredItemsResponse
  631. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  632. const UA_CreateMonitoredItemsRequest request, void **contexts,
  633. UA_Client_EventNotificationCallback *callback,
  634. UA_Client_DeleteMonitoredItemCallback *deleteCallback) {
  635. UA_CreateMonitoredItemsResponse response;
  636. __UA_Client_MonitoredItems_create(client, &request, contexts,
  637. (void**)(uintptr_t)callback, deleteCallback, &response);
  638. return response;
  639. }
  640. /* Monitor the EventNotifier attribute only */
  641. UA_StatusCode
  642. UA_Client_MonitoredItems_createEvents_async(
  643. UA_Client *client, const UA_CreateMonitoredItemsRequest request, void **contexts,
  644. UA_Client_EventNotificationCallback *callbacks,
  645. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  646. UA_ClientAsyncServiceCallback createCallback, void *userdata, UA_UInt32 *requestId) {
  647. return __UA_Client_MonitoredItems_createDataChanges_async(
  648. client, request, contexts, (void **)(uintptr_t)callbacks, deleteCallbacks,
  649. createCallback, userdata, requestId);
  650. }
  651. UA_MonitoredItemCreateResult
  652. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  653. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  654. void *context, UA_Client_EventNotificationCallback callback,
  655. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  656. UA_CreateMonitoredItemsRequest request;
  657. UA_CreateMonitoredItemsRequest_init(&request);
  658. request.subscriptionId = subscriptionId;
  659. request.timestampsToReturn = timestampsToReturn;
  660. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  661. request.itemsToCreateSize = 1;
  662. UA_CreateMonitoredItemsResponse response =
  663. UA_Client_MonitoredItems_createEvents(client, request, &context,
  664. &callback, &deleteCallback);
  665. UA_StatusCode retval = response.responseHeader.serviceResult;
  666. UA_MonitoredItemCreateResult result;
  667. UA_MonitoredItemCreateResult_init(&result);
  668. if(retval != UA_STATUSCODE_GOOD) {
  669. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  670. result.statusCode = retval;
  671. return result;
  672. }
  673. UA_MonitoredItemCreateResult_copy(response.results , &result);
  674. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  675. return result;
  676. }
  677. static void
  678. __MonitoredItems_delete_handler(UA_Client *client, void *d, UA_UInt32 requestId,
  679. void *r) {
  680. UA_DeleteMonitoredItemsResponse *response = (UA_DeleteMonitoredItemsResponse *)r;
  681. CustomCallback *cc = (CustomCallback *)d;
  682. UA_DeleteMonitoredItemsRequest *request =
  683. (UA_DeleteMonitoredItemsRequest *)cc->clientData;
  684. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  685. goto cleanup;
  686. UA_Client_Subscription *sub = findSubscription(client, request->subscriptionId);
  687. if(!sub) {
  688. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  689. "No internal representation of subscription %u",
  690. request->subscriptionId);
  691. goto cleanup;
  692. }
  693. /* Loop over deleted MonitoredItems */
  694. for(size_t i = 0; i < response->resultsSize; i++) {
  695. if(response->results[i] != UA_STATUSCODE_GOOD &&
  696. response->results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  697. continue;
  698. }
  699. #ifndef __clang_analyzer__
  700. /* Delete the internal representation */
  701. UA_Client_MonitoredItem *mon;
  702. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  703. // NOLINTNEXTLINE
  704. if(mon->monitoredItemId == request->monitoredItemIds[i]) {
  705. UA_Client_MonitoredItem_remove(client, sub, mon);
  706. break;
  707. }
  708. }
  709. #endif
  710. }
  711. cleanup:
  712. if(cc->isAsync) {
  713. if(cc->userCallback)
  714. cc->userCallback(client, cc->userData, requestId, response);
  715. UA_DeleteMonitoredItemsRequest_delete(request);
  716. UA_free(cc);
  717. }
  718. }
  719. UA_DeleteMonitoredItemsResponse
  720. UA_Client_MonitoredItems_delete(UA_Client *client,
  721. const UA_DeleteMonitoredItemsRequest request) {
  722. /* Send the request */
  723. UA_DeleteMonitoredItemsResponse response;
  724. CustomCallback cc;
  725. memset(&cc, 0, sizeof(CustomCallback));
  726. #ifdef __clang_analyzer__
  727. cc.isAsync = false;
  728. #endif
  729. cc.clientData = (void *)(uintptr_t)&request;
  730. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  731. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  732. __MonitoredItems_delete_handler(client, &cc, 0, &response);
  733. return response;
  734. }
  735. UA_StatusCode
  736. UA_Client_MonitoredItems_delete_async(UA_Client *client,
  737. const UA_DeleteMonitoredItemsRequest request,
  738. UA_ClientAsyncServiceCallback callback,
  739. void *userdata, UA_UInt32 *requestId) {
  740. /* Send the request */
  741. CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
  742. if(!cc)
  743. return UA_STATUSCODE_BADOUTOFMEMORY;
  744. UA_DeleteMonitoredItemsRequest *req_copy = UA_DeleteMonitoredItemsRequest_new();
  745. if(!req_copy) {
  746. UA_free(cc);
  747. return UA_STATUSCODE_BADOUTOFMEMORY;
  748. }
  749. UA_DeleteMonitoredItemsRequest_copy(&request, req_copy);
  750. cc->isAsync = true;
  751. cc->clientData = req_copy;
  752. cc->userCallback = callback;
  753. cc->userData = userdata;
  754. return __UA_Client_AsyncService(
  755. client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  756. __MonitoredItems_delete_handler, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE],
  757. cc, requestId);
  758. }
  759. UA_StatusCode
  760. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId,
  761. UA_UInt32 monitoredItemId) {
  762. UA_DeleteMonitoredItemsRequest request;
  763. UA_DeleteMonitoredItemsRequest_init(&request);
  764. request.subscriptionId = subscriptionId;
  765. request.monitoredItemIds = &monitoredItemId;
  766. request.monitoredItemIdsSize = 1;
  767. UA_DeleteMonitoredItemsResponse response =
  768. UA_Client_MonitoredItems_delete(client, request);
  769. UA_StatusCode retval = response.responseHeader.serviceResult;
  770. if(retval != UA_STATUSCODE_GOOD) {
  771. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  772. return retval;
  773. }
  774. if(response.resultsSize != 1) {
  775. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  776. return UA_STATUSCODE_BADINTERNALERROR;
  777. }
  778. retval = response.results[0];
  779. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  780. return retval;
  781. }
  782. UA_ModifyMonitoredItemsResponse UA_EXPORT
  783. UA_Client_MonitoredItems_modify(UA_Client *client,
  784. const UA_ModifyMonitoredItemsRequest request) {
  785. UA_ModifyMonitoredItemsResponse response;
  786. UA_Client_Subscription *sub = 0;
  787. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  788. if (sub->subscriptionId == request.subscriptionId)
  789. break;
  790. }
  791. if (!sub) {
  792. UA_ModifyMonitoredItemsResponse_init(&response);
  793. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  794. return response;
  795. }
  796. UA_ModifyMonitoredItemsRequest modifiedRequest;
  797. UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest);
  798. for (size_t i = 0; i < modifiedRequest.itemsToModifySize; ++i) {
  799. UA_Client_MonitoredItem *mon = 0;
  800. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  801. if(mon->monitoredItemId == modifiedRequest.itemsToModify[i].monitoredItemId) {
  802. modifiedRequest.itemsToModify[i].requestedParameters.clientHandle = mon->clientHandle;
  803. break;
  804. }
  805. }
  806. }
  807. __UA_Client_Service(client,
  808. &modifiedRequest, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST],
  809. &response, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE]);
  810. UA_ModifyMonitoredItemsRequest_deleteMembers(&modifiedRequest);
  811. return response;
  812. }
  813. /*************************************/
  814. /* Async Processing of Notifications */
  815. /*************************************/
  816. /* Assume the request is already initialized */
  817. UA_StatusCode
  818. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  819. /* Count acks */
  820. UA_Client_NotificationsAckNumber *ack;
  821. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  822. ++request->subscriptionAcknowledgementsSize;
  823. /* Create the array. Returns a sentinel pointer if the length is zero. */
  824. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  825. UA_Array_new(request->subscriptionAcknowledgementsSize,
  826. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  827. if(!request->subscriptionAcknowledgements) {
  828. request->subscriptionAcknowledgementsSize = 0;
  829. return UA_STATUSCODE_BADOUTOFMEMORY;
  830. }
  831. size_t i = 0;
  832. UA_Client_NotificationsAckNumber *ack_tmp;
  833. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  834. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  835. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  836. ++i;
  837. LIST_REMOVE(ack, listEntry);
  838. UA_free(ack);
  839. }
  840. return UA_STATUSCODE_GOOD;
  841. }
  842. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  843. /* The value 0 is never used for the sequence number */
  844. static UA_UInt32
  845. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  846. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  847. if(nextSequenceNumber == 0)
  848. nextSequenceNumber = 1;
  849. return nextSequenceNumber;
  850. }
  851. static void
  852. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  853. UA_DataChangeNotification *dataChangeNotification) {
  854. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  855. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  856. /* Find the MonitoredItem */
  857. UA_Client_MonitoredItem *mon;
  858. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  859. if(mon->clientHandle == min->clientHandle)
  860. break;
  861. }
  862. if(!mon) {
  863. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  864. "Could not process a notification with clienthandle %u on subscription %u",
  865. min->clientHandle, sub->subscriptionId);
  866. continue;
  867. }
  868. if(mon->isEventMonitoredItem) {
  869. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  870. "MonitoredItem is configured for Events. But received a "
  871. "DataChangeNotification.");
  872. continue;
  873. }
  874. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  875. mon->monitoredItemId, mon->context,
  876. &min->value);
  877. }
  878. }
  879. static void
  880. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  881. UA_EventNotificationList *eventNotificationList) {
  882. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  883. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  884. /* Find the MonitoredItem */
  885. UA_Client_MonitoredItem *mon;
  886. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  887. if(mon->clientHandle == eventFieldList->clientHandle)
  888. break;
  889. }
  890. if(!mon) {
  891. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  892. "Could not process a notification with clienthandle %u on subscription %u",
  893. eventFieldList->clientHandle, sub->subscriptionId);
  894. continue;
  895. }
  896. if(!mon->isEventMonitoredItem) {
  897. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  898. "MonitoredItem is configured for DataChanges. But received a "
  899. "EventNotification.");
  900. continue;
  901. }
  902. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  903. mon->monitoredItemId, mon->context,
  904. eventFieldList->eventFieldsSize,
  905. eventFieldList->eventFields);
  906. }
  907. }
  908. static void
  909. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  910. UA_ExtensionObject *msg) {
  911. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  912. return;
  913. /* Handle DataChangeNotification */
  914. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  915. UA_DataChangeNotification *dataChangeNotification =
  916. (UA_DataChangeNotification *)msg->content.decoded.data;
  917. processDataChangeNotification(client, sub, dataChangeNotification);
  918. return;
  919. }
  920. /* Handle EventNotification */
  921. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  922. UA_EventNotificationList *eventNotificationList =
  923. (UA_EventNotificationList *)msg->content.decoded.data;
  924. processEventNotification(client, sub, eventNotificationList);
  925. return;
  926. }
  927. /* Handle StatusChangeNotification */
  928. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  929. if(sub->statusChangeCallback) {
  930. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  931. (UA_StatusChangeNotification*)msg->content.decoded.data);
  932. } else {
  933. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  934. "Dropped a StatusChangeNotification since no callback is registered");
  935. }
  936. return;
  937. }
  938. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  939. "Unknown notification message type");
  940. }
  941. void
  942. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  943. UA_PublishResponse *response) {
  944. UA_NotificationMessage *msg = &response->notificationMessage;
  945. client->currentlyOutStandingPublishRequests--;
  946. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  947. if(client->config.outStandingPublishRequests > 1) {
  948. client->config.outStandingPublishRequests--;
  949. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  950. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  951. client->config.outStandingPublishRequests);
  952. } else {
  953. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  954. "Too many publishrequest when outStandingPublishRequests = 1");
  955. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  956. }
  957. return;
  958. }
  959. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  960. return;
  961. if(!LIST_FIRST(&client->subscriptions)) {
  962. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  963. return;
  964. }
  965. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
  966. if(client->state >= UA_CLIENTSTATE_SESSION) {
  967. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  968. "Received Publish Response with code %s",
  969. UA_StatusCode_name(response->responseHeader.serviceResult));
  970. UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId);
  971. if (sub != NULL)
  972. UA_Client_Subscription_deleteInternal(client, sub);
  973. }
  974. return;
  975. }
  976. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  977. UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */
  978. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  979. "Received BadSessionIdInvalid");
  980. return;
  981. }
  982. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTIMEOUT) {
  983. if (client->config.inactivityCallback)
  984. client->config.inactivityCallback(client);
  985. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  986. "Received Timeout for Publish Response");
  987. return;
  988. }
  989. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  990. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  991. "Received Publish Response with code %s",
  992. UA_StatusCode_name(response->responseHeader.serviceResult));
  993. return;
  994. }
  995. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  996. if(!sub) {
  997. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  998. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  999. "Received Publish Response for a non-existant subscription");
  1000. return;
  1001. }
  1002. sub->lastActivity = UA_DateTime_nowMonotonic();
  1003. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  1004. if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
  1005. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  1006. "Invalid subscription sequence number: expected %u but got %u",
  1007. UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
  1008. msg->sequenceNumber);
  1009. /* This is an error. But we do not abort the connection. Some server
  1010. * SDKs misbehave from time to time and send out-of-order sequence
  1011. * numbers. (Probably some multi-threading synchronization issue.) */
  1012. /* UA_Client_disconnect(client);
  1013. return; */
  1014. }
  1015. /* According to f), a keep-alive message contains no notifications and has the sequence number
  1016. * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
  1017. * message or a NotificationMessage following a keep-alive message will share the same sequence
  1018. * number. */
  1019. if (msg->notificationDataSize)
  1020. sub->sequenceNumber = msg->sequenceNumber;
  1021. /* Process the notification messages */
  1022. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  1023. processNotificationMessage(client, sub, &msg->notificationData[k]);
  1024. /* Add to the list of pending acks */
  1025. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  1026. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  1027. continue;
  1028. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  1029. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  1030. if(!tmpAck) {
  1031. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  1032. "Not enough memory to store the acknowledgement for a publish "
  1033. "message on subscription %u", sub->subscriptionId);
  1034. break;
  1035. }
  1036. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  1037. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  1038. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  1039. break;
  1040. }
  1041. }
  1042. static void
  1043. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  1044. void *response) {
  1045. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  1046. UA_PublishResponse *res = (UA_PublishResponse*)response;
  1047. /* Process the response */
  1048. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  1049. /* Delete the cached request */
  1050. UA_PublishRequest_delete(req);
  1051. /* Fill up the outstanding publish requests */
  1052. UA_Client_Subscriptions_backgroundPublish(client);
  1053. }
  1054. void
  1055. UA_Client_Subscriptions_clean(UA_Client *client) {
  1056. UA_Client_NotificationsAckNumber *n, *tmp;
  1057. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  1058. LIST_REMOVE(n, listEntry);
  1059. UA_free(n);
  1060. }
  1061. UA_Client_Subscription *sub, *tmps;
  1062. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  1063. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  1064. client->monitoredItemHandles = 0;
  1065. }
  1066. void
  1067. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  1068. if(client->state < UA_CLIENTSTATE_SESSION)
  1069. return;
  1070. /* Is the lack of responses the client's fault? */
  1071. if(client->currentlyOutStandingPublishRequests == 0)
  1072. return;
  1073. UA_Client_Subscription *sub;
  1074. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  1075. UA_DateTime maxSilence = (UA_DateTime)
  1076. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  1077. client->config.timeout) * UA_DATETIME_MSEC;
  1078. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  1079. /* Reset activity */
  1080. sub->lastActivity = UA_DateTime_nowMonotonic();
  1081. if(client->config.subscriptionInactivityCallback)
  1082. client->config.subscriptionInactivityCallback(client, sub->subscriptionId,
  1083. sub->context);
  1084. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  1085. "Inactivity for Subscription %u.", sub->subscriptionId);
  1086. }
  1087. }
  1088. }
  1089. UA_StatusCode
  1090. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  1091. if(client->state < UA_CLIENTSTATE_SESSION)
  1092. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  1093. /* The session must have at least one subscription */
  1094. if(!LIST_FIRST(&client->subscriptions))
  1095. return UA_STATUSCODE_GOOD;
  1096. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  1097. UA_PublishRequest *request = UA_PublishRequest_new();
  1098. if (!request)
  1099. return UA_STATUSCODE_BADOUTOFMEMORY;
  1100. request->requestHeader.timeoutHint=60000;
  1101. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  1102. if(retval != UA_STATUSCODE_GOOD) {
  1103. UA_PublishRequest_delete(request);
  1104. return retval;
  1105. }
  1106. UA_UInt32 requestId;
  1107. client->currentlyOutStandingPublishRequests++;
  1108. /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
  1109. retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  1110. processPublishResponseAsync,
  1111. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  1112. (void*)request, &requestId, 0);
  1113. if(retval != UA_STATUSCODE_GOOD) {
  1114. UA_PublishRequest_delete(request);
  1115. return retval;
  1116. }
  1117. }
  1118. return UA_STATUSCODE_GOOD;
  1119. }
  1120. #endif /* UA_ENABLE_SUBSCRIPTIONS */