ua_client_subscriptions.c 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include <open62541/client_highlevel.h>
  14. #include <open62541/client_highlevel_async.h>
  15. #include "ua_client_internal.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. UA_CreateSubscriptionResponse UA_EXPORT
  21. UA_Client_Subscriptions_create(UA_Client *client,
  22. const UA_CreateSubscriptionRequest request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_CreateSubscriptionResponse response;
  27. UA_CreateSubscriptionResponse_init(&response);
  28. /* Allocate the internal representation */
  29. UA_Client_Subscription *newSub = (UA_Client_Subscription*)
  30. UA_malloc(sizeof(UA_Client_Subscription));
  31. if(!newSub) {
  32. response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  33. return response;
  34. }
  35. /* Send the request as a synchronous service call */
  36. __UA_Client_Service(client,
  37. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  38. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  39. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  40. UA_free(newSub);
  41. return response;
  42. }
  43. /* Prepare the internal representation */
  44. newSub->context = subscriptionContext;
  45. newSub->subscriptionId = response.subscriptionId;
  46. newSub->sequenceNumber = 0;
  47. newSub->lastActivity = UA_DateTime_nowMonotonic();
  48. newSub->statusChangeCallback = statusChangeCallback;
  49. newSub->deleteCallback = deleteCallback;
  50. newSub->publishingInterval = response.revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. return response;
  55. }
  56. static UA_Client_Subscription *
  57. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  58. UA_Client_Subscription *sub = NULL;
  59. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  60. if(sub->subscriptionId == subscriptionId)
  61. break;
  62. }
  63. return sub;
  64. }
  65. UA_ModifySubscriptionResponse UA_EXPORT
  66. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  67. UA_ModifySubscriptionResponse response;
  68. UA_ModifySubscriptionResponse_init(&response);
  69. /* Find the internal representation */
  70. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  71. if(!sub) {
  72. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  73. return response;
  74. }
  75. /* Call the service */
  76. __UA_Client_Service(client,
  77. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  78. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  79. /* Adjust the internal representation */
  80. sub->publishingInterval = response.revisedPublishingInterval;
  81. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  82. return response;
  83. }
  84. static void
  85. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  86. /* Remove the MonitoredItems */
  87. UA_Client_MonitoredItem *mon, *mon_tmp;
  88. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  89. UA_Client_MonitoredItem_remove(client, sub, mon);
  90. /* Call the delete callback */
  91. if(sub->deleteCallback)
  92. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  93. /* Remove */
  94. LIST_REMOVE(sub, listEntry);
  95. UA_free(sub);
  96. }
  97. UA_DeleteSubscriptionsResponse UA_EXPORT
  98. UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
  99. UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
  100. memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
  101. /* temporary remove the subscriptions from the list */
  102. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  103. subs[i] = findSubscription(client, request.subscriptionIds[i]);
  104. if (subs[i])
  105. LIST_REMOVE(subs[i], listEntry);
  106. }
  107. /* Send the request */
  108. UA_DeleteSubscriptionsResponse response;
  109. __UA_Client_Service(client,
  110. &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  111. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  112. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  113. goto cleanup;
  114. if(request.subscriptionIdsSize != response.resultsSize) {
  115. response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  116. goto cleanup;
  117. }
  118. /* Loop over the removed subscriptions and remove internally */
  119. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  120. if(response.results[i] != UA_STATUSCODE_GOOD &&
  121. response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  122. /* Something was wrong, reinsert the subscription in the list */
  123. if (subs[i])
  124. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  125. continue;
  126. }
  127. if(!subs[i]) {
  128. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  129. "No internal representation of subscription %u",
  130. request.subscriptionIds[i]);
  131. continue;
  132. }
  133. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  134. UA_Client_Subscription_deleteInternal(client, subs[i]);
  135. }
  136. return response;
  137. cleanup:
  138. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  139. if (subs[i]) {
  140. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  141. }
  142. }
  143. return response;
  144. }
  145. UA_StatusCode UA_EXPORT
  146. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  147. UA_DeleteSubscriptionsRequest request;
  148. UA_DeleteSubscriptionsRequest_init(&request);
  149. request.subscriptionIds = &subscriptionId;
  150. request.subscriptionIdsSize = 1;
  151. UA_DeleteSubscriptionsResponse response =
  152. UA_Client_Subscriptions_delete(client, request);
  153. UA_StatusCode retval = response.responseHeader.serviceResult;
  154. if(retval != UA_STATUSCODE_GOOD) {
  155. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  156. return retval;
  157. }
  158. if(response.resultsSize != 1) {
  159. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  160. return UA_STATUSCODE_BADINTERNALERROR;
  161. }
  162. retval = response.results[0];
  163. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  164. return retval;
  165. }
  166. /******************/
  167. /* MonitoredItems */
  168. /******************/
  169. void
  170. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  171. UA_Client_MonitoredItem *mon) {
  172. // NOLINTNEXTLINE
  173. LIST_REMOVE(mon, listEntry);
  174. if(mon->deleteCallback)
  175. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  176. mon->monitoredItemId, mon->context);
  177. UA_free(mon);
  178. }
  179. static void
  180. __UA_Client_MonitoredItems_create(UA_Client *client,
  181. const UA_CreateMonitoredItemsRequest *request,
  182. void **contexts, void **handlingCallbacks,
  183. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  184. UA_CreateMonitoredItemsResponse *response) {
  185. UA_CreateMonitoredItemsResponse_init(response);
  186. if (!request->itemsToCreateSize) {
  187. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  188. return;
  189. }
  190. /* Fix clang warning */
  191. size_t itemsToCreateSize = request->itemsToCreateSize;
  192. UA_Client_Subscription *sub = NULL;
  193. /* Allocate the memory for internal representations */
  194. UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
  195. memset(mis, 0, sizeof(void*) * itemsToCreateSize);
  196. for(size_t i = 0; i < itemsToCreateSize; i++) {
  197. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  198. if(!mis[i]) {
  199. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  200. goto cleanup;
  201. }
  202. }
  203. /* Get the subscription */
  204. sub = findSubscription(client, request->subscriptionId);
  205. if(!sub) {
  206. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  207. goto cleanup;
  208. }
  209. /* Set the clientHandle */
  210. for(size_t i = 0; i < itemsToCreateSize; i++)
  211. request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  212. /* Call the service */
  213. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  214. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  215. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  216. goto cleanup;
  217. if(response->resultsSize != itemsToCreateSize) {
  218. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  219. goto cleanup;
  220. }
  221. /* Add internally */
  222. for(size_t i = 0; i < itemsToCreateSize; i++) {
  223. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  224. if (deleteCallbacks[i])
  225. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  226. UA_free(mis[i]);
  227. mis[i] = NULL;
  228. continue;
  229. }
  230. UA_Client_MonitoredItem *newMon = mis[i];
  231. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  232. newMon->monitoredItemId = response->results[i].monitoredItemId;
  233. newMon->context = contexts[i];
  234. newMon->deleteCallback = deleteCallbacks[i];
  235. newMon->handler.dataChangeCallback =
  236. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  237. newMon->isEventMonitoredItem =
  238. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  239. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  240. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  241. "Subscription %u | Added a MonitoredItem with handle %u",
  242. sub->subscriptionId, newMon->clientHandle);
  243. }
  244. return;
  245. cleanup:
  246. for(size_t i = 0; i < itemsToCreateSize; i++) {
  247. if (deleteCallbacks[i]) {
  248. if (sub)
  249. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  250. else
  251. deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
  252. }
  253. if(mis[i])
  254. UA_free(mis[i]);
  255. }
  256. }
  257. UA_CreateMonitoredItemsResponse UA_EXPORT
  258. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  259. const UA_CreateMonitoredItemsRequest request, void **contexts,
  260. UA_Client_DataChangeNotificationCallback *callbacks,
  261. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  262. UA_CreateMonitoredItemsResponse response;
  263. __UA_Client_MonitoredItems_create(client, &request, contexts,
  264. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  265. return response;
  266. }
  267. UA_MonitoredItemCreateResult UA_EXPORT
  268. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  269. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  270. void *context, UA_Client_DataChangeNotificationCallback callback,
  271. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  272. UA_CreateMonitoredItemsRequest request;
  273. UA_CreateMonitoredItemsRequest_init(&request);
  274. request.subscriptionId = subscriptionId;
  275. request.timestampsToReturn = timestampsToReturn;
  276. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  277. request.itemsToCreateSize = 1;
  278. UA_CreateMonitoredItemsResponse response =
  279. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  280. &callback, &deleteCallback);
  281. UA_MonitoredItemCreateResult result;
  282. UA_MonitoredItemCreateResult_init(&result);
  283. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  284. result.statusCode = response.responseHeader.serviceResult;
  285. if(result.statusCode == UA_STATUSCODE_GOOD &&
  286. response.resultsSize != 1)
  287. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  288. if(result.statusCode == UA_STATUSCODE_GOOD)
  289. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  290. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  291. return result;
  292. }
  293. UA_CreateMonitoredItemsResponse UA_EXPORT
  294. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  295. const UA_CreateMonitoredItemsRequest request, void **contexts,
  296. UA_Client_EventNotificationCallback *callback,
  297. UA_Client_DeleteMonitoredItemCallback *deleteCallback) {
  298. UA_CreateMonitoredItemsResponse response;
  299. __UA_Client_MonitoredItems_create(client, &request, contexts,
  300. (void**)(uintptr_t)callback, deleteCallback, &response);
  301. return response;
  302. }
  303. UA_MonitoredItemCreateResult UA_EXPORT
  304. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  305. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  306. void *context, UA_Client_EventNotificationCallback callback,
  307. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  308. UA_CreateMonitoredItemsRequest request;
  309. UA_CreateMonitoredItemsRequest_init(&request);
  310. request.subscriptionId = subscriptionId;
  311. request.timestampsToReturn = timestampsToReturn;
  312. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  313. request.itemsToCreateSize = 1;
  314. UA_CreateMonitoredItemsResponse response =
  315. UA_Client_MonitoredItems_createEvents(client, request, &context,
  316. &callback, &deleteCallback);
  317. UA_StatusCode retval = response.responseHeader.serviceResult;
  318. UA_MonitoredItemCreateResult result;
  319. UA_MonitoredItemCreateResult_init(&result);
  320. if(retval != UA_STATUSCODE_GOOD) {
  321. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  322. result.statusCode = retval;
  323. return result;
  324. }
  325. UA_MonitoredItemCreateResult_copy(response.results , &result);
  326. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  327. return result;
  328. }
  329. UA_DeleteMonitoredItemsResponse UA_EXPORT
  330. UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
  331. /* Send the request */
  332. UA_DeleteMonitoredItemsResponse response;
  333. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  334. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  335. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  336. return response;
  337. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  338. if(!sub) {
  339. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  340. "No internal representation of subscription %u",
  341. request.subscriptionId);
  342. return response;
  343. }
  344. /* Loop over deleted MonitoredItems */
  345. for(size_t i = 0; i < response.resultsSize; i++) {
  346. if(response.results[i] != UA_STATUSCODE_GOOD &&
  347. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  348. continue;
  349. }
  350. #ifndef __clang_analyzer__
  351. /* Delete the internal representation */
  352. UA_Client_MonitoredItem *mon;
  353. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  354. // NOLINTNEXTLINE
  355. if (mon->monitoredItemId == request.monitoredItemIds[i]) {
  356. UA_Client_MonitoredItem_remove(client, sub, mon);
  357. break;
  358. }
  359. }
  360. #endif
  361. }
  362. return response;
  363. }
  364. UA_StatusCode UA_EXPORT
  365. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
  366. UA_DeleteMonitoredItemsRequest request;
  367. UA_DeleteMonitoredItemsRequest_init(&request);
  368. request.subscriptionId = subscriptionId;
  369. request.monitoredItemIds = &monitoredItemId;
  370. request.monitoredItemIdsSize = 1;
  371. UA_DeleteMonitoredItemsResponse response =
  372. UA_Client_MonitoredItems_delete(client, request);
  373. UA_StatusCode retval = response.responseHeader.serviceResult;
  374. if(retval != UA_STATUSCODE_GOOD) {
  375. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  376. return retval;
  377. }
  378. if(response.resultsSize != 1) {
  379. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  380. return UA_STATUSCODE_BADINTERNALERROR;
  381. }
  382. retval = response.results[0];
  383. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  384. return retval;
  385. }
  386. UA_ModifyMonitoredItemsResponse UA_EXPORT
  387. UA_Client_MonitoredItems_modify(UA_Client *client,
  388. const UA_ModifyMonitoredItemsRequest request) {
  389. UA_ModifyMonitoredItemsResponse response;
  390. UA_Client_Subscription *sub = 0;
  391. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  392. if (sub->subscriptionId == request.subscriptionId)
  393. break;
  394. }
  395. if (!sub) {
  396. UA_ModifyMonitoredItemsResponse_init(&response);
  397. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  398. return response;
  399. }
  400. UA_ModifyMonitoredItemsRequest modifiedRequest;
  401. UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest);
  402. for (size_t i = 0; i < modifiedRequest.itemsToModifySize; ++i) {
  403. UA_Client_MonitoredItem *mon = 0;
  404. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  405. if(mon->monitoredItemId == modifiedRequest.itemsToModify[i].monitoredItemId) {
  406. modifiedRequest.itemsToModify[i].requestedParameters.clientHandle = mon->clientHandle;
  407. break;
  408. }
  409. }
  410. }
  411. __UA_Client_Service(client,
  412. &modifiedRequest, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST],
  413. &response, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE]);
  414. UA_ModifyMonitoredItemsRequest_deleteMembers(&modifiedRequest);
  415. return response;
  416. }
  417. /*************************************/
  418. /* Async Processing of Notifications */
  419. /*************************************/
  420. /* Assume the request is already initialized */
  421. UA_StatusCode
  422. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  423. /* Count acks */
  424. UA_Client_NotificationsAckNumber *ack;
  425. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  426. ++request->subscriptionAcknowledgementsSize;
  427. /* Create the array. Returns a sentinel pointer if the length is zero. */
  428. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  429. UA_Array_new(request->subscriptionAcknowledgementsSize,
  430. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  431. if(!request->subscriptionAcknowledgements) {
  432. request->subscriptionAcknowledgementsSize = 0;
  433. return UA_STATUSCODE_BADOUTOFMEMORY;
  434. }
  435. size_t i = 0;
  436. UA_Client_NotificationsAckNumber *ack_tmp;
  437. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  438. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  439. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  440. ++i;
  441. LIST_REMOVE(ack, listEntry);
  442. UA_free(ack);
  443. }
  444. return UA_STATUSCODE_GOOD;
  445. }
  446. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  447. /* The value 0 is never used for the sequence number */
  448. static UA_UInt32
  449. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  450. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  451. if(nextSequenceNumber == 0)
  452. nextSequenceNumber = 1;
  453. return nextSequenceNumber;
  454. }
  455. static void
  456. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  457. UA_DataChangeNotification *dataChangeNotification) {
  458. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  459. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  460. /* Find the MonitoredItem */
  461. UA_Client_MonitoredItem *mon;
  462. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  463. if(mon->clientHandle == min->clientHandle)
  464. break;
  465. }
  466. if(!mon) {
  467. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  468. "Could not process a notification with clienthandle %u on subscription %u",
  469. min->clientHandle, sub->subscriptionId);
  470. continue;
  471. }
  472. if(mon->isEventMonitoredItem) {
  473. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  474. "MonitoredItem is configured for Events. But received a "
  475. "DataChangeNotification.");
  476. continue;
  477. }
  478. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  479. mon->monitoredItemId, mon->context,
  480. &min->value);
  481. }
  482. }
  483. static void
  484. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  485. UA_EventNotificationList *eventNotificationList) {
  486. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  487. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  488. /* Find the MonitoredItem */
  489. UA_Client_MonitoredItem *mon;
  490. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  491. if(mon->clientHandle == eventFieldList->clientHandle)
  492. break;
  493. }
  494. if(!mon) {
  495. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  496. "Could not process a notification with clienthandle %u on subscription %u",
  497. eventFieldList->clientHandle, sub->subscriptionId);
  498. continue;
  499. }
  500. if(!mon->isEventMonitoredItem) {
  501. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  502. "MonitoredItem is configured for DataChanges. But received a "
  503. "EventNotification.");
  504. continue;
  505. }
  506. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  507. mon->monitoredItemId, mon->context,
  508. eventFieldList->eventFieldsSize,
  509. eventFieldList->eventFields);
  510. }
  511. }
  512. static void
  513. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  514. UA_ExtensionObject *msg) {
  515. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  516. return;
  517. /* Handle DataChangeNotification */
  518. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  519. UA_DataChangeNotification *dataChangeNotification =
  520. (UA_DataChangeNotification *)msg->content.decoded.data;
  521. processDataChangeNotification(client, sub, dataChangeNotification);
  522. return;
  523. }
  524. /* Handle EventNotification */
  525. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  526. UA_EventNotificationList *eventNotificationList =
  527. (UA_EventNotificationList *)msg->content.decoded.data;
  528. processEventNotification(client, sub, eventNotificationList);
  529. return;
  530. }
  531. /* Handle StatusChangeNotification */
  532. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  533. if(sub->statusChangeCallback) {
  534. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  535. (UA_StatusChangeNotification*)msg->content.decoded.data);
  536. } else {
  537. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  538. "Dropped a StatusChangeNotification since no callback is registered");
  539. }
  540. return;
  541. }
  542. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  543. "Unknown notification message type");
  544. }
  545. void
  546. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  547. UA_PublishResponse *response) {
  548. UA_NotificationMessage *msg = &response->notificationMessage;
  549. client->currentlyOutStandingPublishRequests--;
  550. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  551. if(client->config.outStandingPublishRequests > 1) {
  552. client->config.outStandingPublishRequests--;
  553. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  554. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  555. client->config.outStandingPublishRequests);
  556. } else {
  557. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  558. "Too many publishrequest when outStandingPublishRequests = 1");
  559. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  560. }
  561. return;
  562. }
  563. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  564. return;
  565. if(!LIST_FIRST(&client->subscriptions)) {
  566. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  567. return;
  568. }
  569. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
  570. if(client->state >= UA_CLIENTSTATE_SESSION) {
  571. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  572. "Received Publish Response with code %s",
  573. UA_StatusCode_name(response->responseHeader.serviceResult));
  574. UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId);
  575. if (sub != NULL)
  576. UA_Client_Subscription_deleteInternal(client, sub);
  577. }
  578. return;
  579. }
  580. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  581. UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */
  582. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  583. "Received BadSessionIdInvalid");
  584. return;
  585. }
  586. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTIMEOUT) {
  587. if (client->config.inactivityCallback)
  588. client->config.inactivityCallback(client);
  589. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  590. "Received Timeout for Publish Response");
  591. return;
  592. }
  593. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  594. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  595. "Received Publish Response with code %s",
  596. UA_StatusCode_name(response->responseHeader.serviceResult));
  597. return;
  598. }
  599. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  600. if(!sub) {
  601. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  602. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  603. "Received Publish Response for a non-existant subscription");
  604. return;
  605. }
  606. sub->lastActivity = UA_DateTime_nowMonotonic();
  607. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  608. if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
  609. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  610. "Invalid subscription sequence number: expected %u but got %u",
  611. UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
  612. msg->sequenceNumber);
  613. /* This is an error. But we do not abort the connection. Some server
  614. * SDKs misbehave from time to time and send out-of-order sequence
  615. * numbers. (Probably some multi-threading synchronization issue.) */
  616. /* UA_Client_disconnect(client);
  617. return; */
  618. }
  619. /* According to f), a keep-alive message contains no notifications and has the sequence number
  620. * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
  621. * message or a NotificationMessage following a keep-alive message will share the same sequence
  622. * number. */
  623. if (msg->notificationDataSize)
  624. sub->sequenceNumber = msg->sequenceNumber;
  625. /* Process the notification messages */
  626. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  627. processNotificationMessage(client, sub, &msg->notificationData[k]);
  628. /* Add to the list of pending acks */
  629. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  630. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  631. continue;
  632. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  633. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  634. if(!tmpAck) {
  635. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  636. "Not enough memory to store the acknowledgement for a publish "
  637. "message on subscription %u", sub->subscriptionId);
  638. break;
  639. }
  640. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  641. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  642. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  643. break;
  644. }
  645. }
  646. static void
  647. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  648. void *response) {
  649. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  650. UA_PublishResponse *res = (UA_PublishResponse*)response;
  651. /* Process the response */
  652. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  653. /* Delete the cached request */
  654. UA_PublishRequest_delete(req);
  655. /* Fill up the outstanding publish requests */
  656. UA_Client_Subscriptions_backgroundPublish(client);
  657. }
  658. void
  659. UA_Client_Subscriptions_clean(UA_Client *client) {
  660. UA_Client_NotificationsAckNumber *n, *tmp;
  661. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  662. LIST_REMOVE(n, listEntry);
  663. UA_free(n);
  664. }
  665. UA_Client_Subscription *sub, *tmps;
  666. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  667. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  668. client->monitoredItemHandles = 0;
  669. }
  670. void
  671. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  672. if(client->state < UA_CLIENTSTATE_SESSION)
  673. return;
  674. /* Is the lack of responses the client's fault? */
  675. if(client->currentlyOutStandingPublishRequests == 0)
  676. return;
  677. UA_Client_Subscription *sub;
  678. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  679. UA_DateTime maxSilence = (UA_DateTime)
  680. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  681. client->config.timeout) * UA_DATETIME_MSEC;
  682. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  683. /* Reset activity */
  684. sub->lastActivity = UA_DateTime_nowMonotonic();
  685. if(client->config.subscriptionInactivityCallback)
  686. client->config.subscriptionInactivityCallback(client, sub->subscriptionId,
  687. sub->context);
  688. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  689. "Inactivity for Subscription %u.", sub->subscriptionId);
  690. }
  691. }
  692. }
  693. UA_StatusCode
  694. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  695. if(client->state < UA_CLIENTSTATE_SESSION)
  696. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  697. /* The session must have at least one subscription */
  698. if(!LIST_FIRST(&client->subscriptions))
  699. return UA_STATUSCODE_GOOD;
  700. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  701. UA_PublishRequest *request = UA_PublishRequest_new();
  702. if (!request)
  703. return UA_STATUSCODE_BADOUTOFMEMORY;
  704. request->requestHeader.timeoutHint=60000;
  705. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  706. if(retval != UA_STATUSCODE_GOOD) {
  707. UA_PublishRequest_delete(request);
  708. return retval;
  709. }
  710. UA_UInt32 requestId;
  711. client->currentlyOutStandingPublishRequests++;
  712. /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
  713. retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  714. processPublishResponseAsync,
  715. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  716. (void*)request, &requestId, 0);
  717. if(retval != UA_STATUSCODE_GOOD) {
  718. UA_PublishRequest_delete(request);
  719. return retval;
  720. }
  721. }
  722. return UA_STATUSCODE_GOOD;
  723. }
  724. #endif /* UA_ENABLE_SUBSCRIPTIONS */