ua_client_highlevel_subscriptions.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_client_highlevel.h"
  5. #include "ua_client_internal.h"
  6. #include "ua_util.h"
  7. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  8. UA_StatusCode
  9. UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  10. UA_UInt32 *newSubscriptionId) {
  11. UA_CreateSubscriptionRequest request;
  12. UA_CreateSubscriptionRequest_init(&request);
  13. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  14. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  15. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  16. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  17. request.publishingEnabled = settings.publishingEnabled;
  18. request.priority = settings.priority;
  19. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  20. UA_StatusCode retval = response.responseHeader.serviceResult;
  21. if(retval != UA_STATUSCODE_GOOD) {
  22. UA_CreateSubscriptionResponse_deleteMembers(&response);
  23. return retval;
  24. }
  25. UA_Client_Subscription *newSub = (UA_Client_Subscription *)UA_malloc(sizeof(UA_Client_Subscription));
  26. if(!newSub) {
  27. UA_CreateSubscriptionResponse_deleteMembers(&response);
  28. return UA_STATUSCODE_BADOUTOFMEMORY;
  29. }
  30. LIST_INIT(&newSub->monitoredItems);
  31. newSub->lifeTime = response.revisedLifetimeCount;
  32. newSub->keepAliveCount = response.revisedMaxKeepAliveCount;
  33. newSub->publishingInterval = response.revisedPublishingInterval;
  34. newSub->subscriptionID = response.subscriptionId;
  35. newSub->notificationsPerPublish = request.maxNotificationsPerPublish;
  36. newSub->priority = request.priority;
  37. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  38. if(newSubscriptionId)
  39. *newSubscriptionId = newSub->subscriptionID;
  40. UA_CreateSubscriptionResponse_deleteMembers(&response);
  41. return UA_STATUSCODE_GOOD;
  42. }
  43. static UA_Client_Subscription *findSubscription(const UA_Client *client, UA_UInt32 subscriptionId)
  44. {
  45. UA_Client_Subscription *sub = NULL;
  46. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  47. if(sub->subscriptionID == subscriptionId)
  48. break;
  49. }
  50. return sub;
  51. }
  52. /* remove the subscription remotely */
  53. UA_StatusCode
  54. UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  55. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  56. if(!sub)
  57. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  58. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  59. UA_Client_MonitoredItem *mon, *tmpmon;
  60. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
  61. retval =
  62. UA_Client_Subscriptions_removeMonitoredItem(client, sub->subscriptionID,
  63. mon->monitoredItemId);
  64. if(retval != UA_STATUSCODE_GOOD)
  65. return retval;
  66. }
  67. /* remove the subscription remotely */
  68. UA_DeleteSubscriptionsRequest request;
  69. UA_DeleteSubscriptionsRequest_init(&request);
  70. request.subscriptionIdsSize = 1;
  71. request.subscriptionIds = &sub->subscriptionID;
  72. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  73. retval = response.responseHeader.serviceResult;
  74. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
  75. retval = response.results[0];
  76. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  77. if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  78. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  79. "Could not remove subscription %u with error code %s",
  80. sub->subscriptionID, UA_StatusCode_name(retval));
  81. return retval;
  82. }
  83. UA_Client_Subscriptions_forceDelete(client, sub);
  84. return UA_STATUSCODE_GOOD;
  85. }
  86. void
  87. UA_Client_Subscriptions_forceDelete(UA_Client *client,
  88. UA_Client_Subscription *sub) {
  89. UA_Client_MonitoredItem *mon, *mon_tmp;
  90. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
  91. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  92. LIST_REMOVE(mon, listEntry);
  93. UA_free(mon);
  94. }
  95. LIST_REMOVE(sub, listEntry);
  96. UA_free(sub);
  97. }
  98. UA_StatusCode
  99. UA_Client_Subscriptions_addMonitoredEvent(UA_Client *client, const UA_UInt32 subscriptionId,
  100. const UA_NodeId nodeId, const UA_UInt32 attributeID,
  101. UA_SimpleAttributeOperand *selectClause,
  102. const size_t nSelectClauses,
  103. UA_ContentFilterElement *whereClause,
  104. const size_t nWhereClauses,
  105. const UA_MonitoredEventHandlingFunction hf,
  106. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  107. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  108. if(!sub)
  109. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  110. /* Send the request */
  111. UA_CreateMonitoredItemsRequest request;
  112. UA_CreateMonitoredItemsRequest_init(&request);
  113. request.subscriptionId = subscriptionId;
  114. UA_MonitoredItemCreateRequest item;
  115. UA_MonitoredItemCreateRequest_init(&item);
  116. item.itemToMonitor.nodeId = nodeId;
  117. item.itemToMonitor.attributeId = attributeID;
  118. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  119. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  120. item.requestedParameters.samplingInterval = 0;
  121. item.requestedParameters.discardOldest = false;
  122. UA_EventFilter *evFilter = UA_EventFilter_new();
  123. if(!evFilter) {
  124. return UA_STATUSCODE_BADOUTOFMEMORY;
  125. }
  126. UA_EventFilter_init(evFilter);
  127. evFilter->selectClausesSize = nSelectClauses;
  128. evFilter->selectClauses = selectClause;
  129. evFilter->whereClause.elementsSize = nWhereClauses;
  130. evFilter->whereClause.elements = whereClause;
  131. item.requestedParameters.filter.encoding = UA_EXTENSIONOBJECT_DECODED_NODELETE;
  132. item.requestedParameters.filter.content.decoded.type = &UA_TYPES[UA_TYPES_EVENTFILTER];
  133. item.requestedParameters.filter.content.decoded.data = evFilter;
  134. request.itemsToCreate = &item;
  135. request.itemsToCreateSize = 1;
  136. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  137. // slight misuse of retval here to check if the deletion was successfull.
  138. UA_StatusCode retval;
  139. if(response.resultsSize == 0)
  140. retval = response.responseHeader.serviceResult;
  141. else
  142. retval = response.results[0].statusCode;
  143. if(retval != UA_STATUSCODE_GOOD) {
  144. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  145. UA_EventFilter_delete(evFilter);
  146. return retval;
  147. }
  148. /* Create the handler */
  149. UA_Client_MonitoredItem *newMon = (UA_Client_MonitoredItem *)UA_malloc(sizeof(UA_Client_MonitoredItem));
  150. if(!newMon) {
  151. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  152. UA_EventFilter_delete(evFilter);
  153. return UA_STATUSCODE_BADOUTOFMEMORY;
  154. }
  155. newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
  156. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  157. newMon->attributeID = attributeID;
  158. newMon->clientHandle = client->monitoredItemHandles;
  159. newMon->samplingInterval = 0;
  160. newMon->queueSize = 0;
  161. newMon->discardOldest = false;
  162. newMon->handlerEvents = hf;
  163. newMon->handlerEventsContext = hfContext;
  164. newMon->monitoredItemId = response.results[0].monitoredItemId;
  165. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  166. *newMonitoredItemId = newMon->monitoredItemId;
  167. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  168. "Created a monitored item with client handle %u", client->monitoredItemHandles);
  169. UA_EventFilter_delete(evFilter);
  170. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  171. return UA_STATUSCODE_GOOD;
  172. }
  173. UA_StatusCode
  174. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  175. UA_NodeId nodeId, UA_UInt32 attributeID,
  176. UA_MonitoredItemHandlingFunction hf,
  177. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  178. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  179. if(!sub)
  180. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  181. /* Create the handler */
  182. UA_Client_MonitoredItem *newMon = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  183. if(!newMon)
  184. return UA_STATUSCODE_BADOUTOFMEMORY;
  185. /* Send the request */
  186. UA_CreateMonitoredItemsRequest request;
  187. UA_CreateMonitoredItemsRequest_init(&request);
  188. request.subscriptionId = subscriptionId;
  189. UA_MonitoredItemCreateRequest item;
  190. UA_MonitoredItemCreateRequest_init(&item);
  191. item.itemToMonitor.nodeId = nodeId;
  192. item.itemToMonitor.attributeId = attributeID;
  193. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  194. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  195. item.requestedParameters.samplingInterval = sub->publishingInterval;
  196. item.requestedParameters.discardOldest = true;
  197. item.requestedParameters.queueSize = 1;
  198. request.itemsToCreate = &item;
  199. request.itemsToCreateSize = 1;
  200. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  201. // slight misuse of retval here to check if the addition was successfull.
  202. UA_StatusCode retval = response.responseHeader.serviceResult;
  203. if(retval == UA_STATUSCODE_GOOD) {
  204. if(response.resultsSize == 1)
  205. retval = response.results[0].statusCode;
  206. else
  207. retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  208. }
  209. if(retval != UA_STATUSCODE_GOOD) {
  210. UA_free(newMon);
  211. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  212. return retval;
  213. }
  214. /* Set the handler */
  215. newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
  216. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  217. newMon->attributeID = attributeID;
  218. newMon->clientHandle = client->monitoredItemHandles;
  219. newMon->samplingInterval = sub->publishingInterval;
  220. newMon->queueSize = 1;
  221. newMon->discardOldest = true;
  222. newMon->handler = hf;
  223. newMon->handlerContext = hfContext;
  224. newMon->monitoredItemId = response.results[0].monitoredItemId;
  225. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  226. *newMonitoredItemId = newMon->monitoredItemId;
  227. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  228. "Created a monitored item with client handle %u",
  229. client->monitoredItemHandles);
  230. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  231. return UA_STATUSCODE_GOOD;
  232. }
  233. UA_StatusCode
  234. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  235. UA_UInt32 monitoredItemId) {
  236. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  237. if(!sub)
  238. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  239. UA_Client_MonitoredItem *mon;
  240. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  241. if(mon->monitoredItemId == monitoredItemId)
  242. break;
  243. }
  244. if(!mon)
  245. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  246. /* remove the monitoreditem remotely */
  247. UA_DeleteMonitoredItemsRequest request;
  248. UA_DeleteMonitoredItemsRequest_init(&request);
  249. request.subscriptionId = sub->subscriptionID;
  250. request.monitoredItemIdsSize = 1;
  251. request.monitoredItemIds = &mon->monitoredItemId;
  252. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  253. UA_StatusCode retval = response.responseHeader.serviceResult;
  254. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 1)
  255. retval = response.results[0];
  256. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  257. if(retval != UA_STATUSCODE_GOOD &&
  258. retval != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  259. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  260. "Could not remove monitoreditem %u with error code %s",
  261. monitoredItemId, UA_StatusCode_name(retval));
  262. return retval;
  263. }
  264. LIST_REMOVE(mon, listEntry);
  265. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  266. UA_free(mon);
  267. return UA_STATUSCODE_GOOD;
  268. }
  269. static void
  270. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  271. UA_PublishResponse *response) {
  272. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  273. return;
  274. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  275. if(!sub)
  276. return;
  277. /* Check if the server has acknowledged any of the sent ACKs */
  278. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
  279. /* remove also acks that are unknown to the server */
  280. if(response->results[i] != UA_STATUSCODE_GOOD &&
  281. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  282. continue;
  283. /* Remove the ack from the list */
  284. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  285. UA_Client_NotificationsAckNumber *ack;
  286. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  287. if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
  288. ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
  289. LIST_REMOVE(ack, listEntry);
  290. UA_free(ack);
  291. UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
  292. break;
  293. }
  294. }
  295. }
  296. /* Process the notification messages */
  297. UA_NotificationMessage *msg = &response->notificationMessage;
  298. for(size_t k = 0; k < msg->notificationDataSize; ++k) {
  299. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  300. continue;
  301. if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  302. UA_DataChangeNotification *dataChangeNotification = (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
  303. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  304. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  305. UA_Client_MonitoredItem *mon;
  306. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  307. if(mon->clientHandle == mitemNot->clientHandle) {
  308. mon->handler(mon->monitoredItemId, &mitemNot->value, mon->handlerContext);
  309. break;
  310. }
  311. }
  312. if(!mon)
  313. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  314. "Could not process a notification with clienthandle %u on subscription %u",
  315. mitemNot->clientHandle, sub->subscriptionID);
  316. }
  317. }
  318. else if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  319. UA_EventNotificationList *eventNotificationList = (UA_EventNotificationList *)msg->notificationData[k].content.decoded.data;
  320. for (size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  321. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  322. UA_Client_MonitoredItem *mon;
  323. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  324. if(mon->clientHandle == eventFieldList->clientHandle) {
  325. mon->handlerEvents(mon->monitoredItemId, eventFieldList->eventFieldsSize,
  326. eventFieldList->eventFields, mon->handlerContext);
  327. break;
  328. }
  329. }
  330. if(!mon)
  331. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  332. "Could not process a notification with clienthandle %u on subscription %u",
  333. eventFieldList->clientHandle, sub->subscriptionID);
  334. }
  335. }
  336. else {
  337. continue; // no other types are supported
  338. }
  339. }
  340. /* Add to the list of pending acks */
  341. UA_Client_NotificationsAckNumber *tmpAck =
  342. (UA_Client_NotificationsAckNumber*)UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  343. if(!tmpAck) {
  344. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  345. "Not enough memory to store the acknowledgement for a publish "
  346. "message on subscription %u", sub->subscriptionID);
  347. return;
  348. }
  349. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  350. tmpAck->subAck.subscriptionId = sub->subscriptionID;
  351. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  352. }
  353. UA_StatusCode
  354. UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  355. if(client->state < UA_CLIENTSTATE_SESSION)
  356. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  357. UA_Boolean moreNotifications = true;
  358. while(moreNotifications) {
  359. UA_PublishRequest request;
  360. UA_PublishRequest_init(&request);
  361. request.subscriptionAcknowledgementsSize = 0;
  362. UA_Client_NotificationsAckNumber *ack;
  363. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  364. ++request.subscriptionAcknowledgementsSize;
  365. if(request.subscriptionAcknowledgementsSize > 0) {
  366. request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  367. UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  368. if(!request.subscriptionAcknowledgements)
  369. return UA_STATUSCODE_BADOUTOFMEMORY;
  370. }
  371. int i = 0;
  372. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  373. request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  374. request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  375. ++i;
  376. }
  377. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  378. UA_Client_processPublishResponse(client, &request, &response);
  379. moreNotifications = response.moreNotifications;
  380. UA_PublishResponse_deleteMembers(&response);
  381. UA_PublishRequest_deleteMembers(&request);
  382. }
  383. return UA_STATUSCODE_GOOD;
  384. }
  385. #endif /* UA_ENABLE_SUBSCRIPTIONS */