ua_client_highlevel_subscriptions.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_client_highlevel.h"
  5. #include "ua_client_internal.h"
  6. #include "ua_util.h"
  7. #include "ua_types_generated_encoding_binary.h"
  8. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  9. UA_StatusCode
  10. UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  11. UA_UInt32 *newSubscriptionId) {
  12. UA_CreateSubscriptionRequest request;
  13. UA_CreateSubscriptionRequest_init(&request);
  14. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  15. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  16. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  17. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  18. request.publishingEnabled = settings.publishingEnabled;
  19. request.priority = settings.priority;
  20. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  21. UA_StatusCode retval = response.responseHeader.serviceResult;
  22. if(retval != UA_STATUSCODE_GOOD) {
  23. UA_CreateSubscriptionResponse_deleteMembers(&response);
  24. return retval;
  25. }
  26. UA_Client_Subscription *newSub = (UA_Client_Subscription *)UA_malloc(sizeof(UA_Client_Subscription));
  27. if(!newSub) {
  28. UA_CreateSubscriptionResponse_deleteMembers(&response);
  29. return UA_STATUSCODE_BADOUTOFMEMORY;
  30. }
  31. LIST_INIT(&newSub->MonitoredItems);
  32. newSub->LifeTime = response.revisedLifetimeCount;
  33. newSub->KeepAliveCount = response.revisedMaxKeepAliveCount;
  34. newSub->PublishingInterval = response.revisedPublishingInterval;
  35. newSub->SubscriptionID = response.subscriptionId;
  36. newSub->NotificationsPerPublish = request.maxNotificationsPerPublish;
  37. newSub->Priority = request.priority;
  38. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  39. if(newSubscriptionId)
  40. *newSubscriptionId = newSub->SubscriptionID;
  41. UA_CreateSubscriptionResponse_deleteMembers(&response);
  42. return UA_STATUSCODE_GOOD;
  43. }
  44. /* remove the subscription remotely */
  45. UA_StatusCode
  46. UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  47. UA_Client_Subscription *sub;
  48. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  49. if(sub->SubscriptionID == subscriptionId)
  50. break;
  51. }
  52. if(!sub)
  53. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  54. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  55. UA_Client_MonitoredItem *mon, *tmpmon;
  56. LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmpmon) {
  57. retval =
  58. UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
  59. mon->MonitoredItemId);
  60. if(retval != UA_STATUSCODE_GOOD)
  61. return retval;
  62. }
  63. /* remove the subscription remotely */
  64. UA_DeleteSubscriptionsRequest request;
  65. UA_DeleteSubscriptionsRequest_init(&request);
  66. request.subscriptionIdsSize = 1;
  67. request.subscriptionIds = &sub->SubscriptionID;
  68. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  69. retval = response.responseHeader.serviceResult;
  70. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
  71. retval = response.results[0];
  72. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  73. if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  74. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  75. "Could not remove subscription %u with statuscode 0x%08x",
  76. sub->SubscriptionID, retval);
  77. return retval;
  78. }
  79. UA_Client_Subscriptions_forceDelete(client, sub);
  80. return UA_STATUSCODE_GOOD;
  81. }
  82. void
  83. UA_Client_Subscriptions_forceDelete(UA_Client *client,
  84. UA_Client_Subscription *sub) {
  85. UA_Client_MonitoredItem *mon, *mon_tmp;
  86. LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, mon_tmp) {
  87. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  88. LIST_REMOVE(mon, listEntry);
  89. UA_free(mon);
  90. }
  91. LIST_REMOVE(sub, listEntry);
  92. UA_free(sub);
  93. }
  94. UA_StatusCode
  95. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  96. UA_NodeId nodeId, UA_UInt32 attributeID,
  97. UA_MonitoredItemHandlingFunction hf,
  98. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  99. UA_Client_Subscription *sub;
  100. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  101. if(sub->SubscriptionID == subscriptionId)
  102. break;
  103. }
  104. if(!sub)
  105. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  106. /* Send the request */
  107. UA_CreateMonitoredItemsRequest request;
  108. UA_CreateMonitoredItemsRequest_init(&request);
  109. request.subscriptionId = subscriptionId;
  110. UA_MonitoredItemCreateRequest item;
  111. UA_MonitoredItemCreateRequest_init(&item);
  112. item.itemToMonitor.nodeId = nodeId;
  113. item.itemToMonitor.attributeId = attributeID;
  114. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  115. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  116. item.requestedParameters.samplingInterval = sub->PublishingInterval;
  117. item.requestedParameters.discardOldest = true;
  118. item.requestedParameters.queueSize = 1;
  119. request.itemsToCreate = &item;
  120. request.itemsToCreateSize = 1;
  121. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  122. // slight misuse of retval here to check if the deletion was successfull.
  123. UA_StatusCode retval;
  124. if(response.resultsSize == 0)
  125. retval = response.responseHeader.serviceResult;
  126. else
  127. retval = response.results[0].statusCode;
  128. if(retval != UA_STATUSCODE_GOOD) {
  129. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  130. return retval;
  131. }
  132. /* Create the handler */
  133. UA_Client_MonitoredItem *newMon = (UA_Client_MonitoredItem *)UA_malloc(sizeof(UA_Client_MonitoredItem));
  134. newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
  135. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  136. newMon->AttributeID = attributeID;
  137. newMon->ClientHandle = client->monitoredItemHandles;
  138. newMon->SamplingInterval = sub->PublishingInterval;
  139. newMon->QueueSize = 1;
  140. newMon->DiscardOldest = true;
  141. newMon->handler = hf;
  142. newMon->handlerContext = hfContext;
  143. newMon->MonitoredItemId = response.results[0].monitoredItemId;
  144. LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
  145. *newMonitoredItemId = newMon->MonitoredItemId;
  146. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  147. "Created a monitored item with client handle %u", client->monitoredItemHandles);
  148. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  149. return UA_STATUSCODE_GOOD;
  150. }
  151. UA_StatusCode
  152. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  153. UA_UInt32 monitoredItemId) {
  154. UA_Client_Subscription *sub;
  155. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  156. if(sub->SubscriptionID == subscriptionId)
  157. break;
  158. }
  159. if(!sub)
  160. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  161. UA_Client_MonitoredItem *mon;
  162. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  163. if(mon->MonitoredItemId == monitoredItemId)
  164. break;
  165. }
  166. if(!mon)
  167. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  168. /* remove the monitoreditem remotely */
  169. UA_DeleteMonitoredItemsRequest request;
  170. UA_DeleteMonitoredItemsRequest_init(&request);
  171. request.subscriptionId = sub->SubscriptionID;
  172. request.monitoredItemIdsSize = 1;
  173. request.monitoredItemIds = &mon->MonitoredItemId;
  174. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  175. UA_StatusCode retval = response.responseHeader.serviceResult;
  176. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 1)
  177. retval = response.results[0];
  178. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  179. if(retval != UA_STATUSCODE_GOOD &&
  180. retval != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  181. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  182. "Could not remove monitoreditem %u with statuscode 0x%08x",
  183. monitoredItemId, retval);
  184. return retval;
  185. }
  186. LIST_REMOVE(mon, listEntry);
  187. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  188. UA_free(mon);
  189. return UA_STATUSCODE_GOOD;
  190. }
  191. static void
  192. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  193. UA_PublishResponse *response) {
  194. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  195. return;
  196. /* Find the subscription */
  197. UA_Client_Subscription *sub;
  198. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  199. if(sub->SubscriptionID == response->subscriptionId)
  200. break;
  201. }
  202. if(!sub)
  203. return;
  204. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  205. "Processing a publish response on subscription %u with %u notifications",
  206. sub->SubscriptionID, response->notificationMessage.notificationDataSize);
  207. /* Check if the server has acknowledged any of the sent ACKs */
  208. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
  209. /* remove also acks that are unknown to the server */
  210. if(response->results[i] != UA_STATUSCODE_GOOD &&
  211. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  212. continue;
  213. /* Remove the ack from the list */
  214. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  215. UA_Client_NotificationsAckNumber *ack;
  216. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  217. if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
  218. ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
  219. LIST_REMOVE(ack, listEntry);
  220. UA_free(ack);
  221. UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
  222. break;
  223. }
  224. }
  225. }
  226. /* Process the notification messages */
  227. UA_NotificationMessage *msg = &response->notificationMessage;
  228. for(size_t k = 0; k < msg->notificationDataSize; ++k) {
  229. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  230. continue;
  231. /* Currently only dataChangeNotifications are supported */
  232. if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
  233. continue;
  234. UA_DataChangeNotification *dataChangeNotification = (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
  235. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  236. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  237. UA_Client_MonitoredItem *mon;
  238. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  239. if(mon->ClientHandle == mitemNot->clientHandle) {
  240. mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
  241. break;
  242. }
  243. }
  244. if(!mon)
  245. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  246. "Could not process a notification with clienthandle %u on subscription %u",
  247. mitemNot->clientHandle, sub->SubscriptionID);
  248. }
  249. }
  250. /* Add to the list of pending acks */
  251. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber *)UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  252. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  253. tmpAck->subAck.subscriptionId = sub->SubscriptionID;
  254. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  255. }
  256. UA_StatusCode
  257. UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  258. if (client->state == UA_CLIENTSTATE_ERRORED)
  259. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  260. UA_Boolean moreNotifications = true;
  261. while(moreNotifications) {
  262. UA_PublishRequest request;
  263. UA_PublishRequest_init(&request);
  264. request.subscriptionAcknowledgementsSize = 0;
  265. UA_Client_NotificationsAckNumber *ack;
  266. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  267. ++request.subscriptionAcknowledgementsSize;
  268. if(request.subscriptionAcknowledgementsSize > 0) {
  269. request.subscriptionAcknowledgements =
  270. (UA_SubscriptionAcknowledgement *)UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  271. if(!request.subscriptionAcknowledgements)
  272. return UA_STATUSCODE_GOOD;
  273. }
  274. int i = 0;
  275. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  276. request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  277. request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  278. ++i;
  279. }
  280. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  281. UA_Client_processPublishResponse(client, &request, &response);
  282. moreNotifications = response.moreNotifications;
  283. UA_PublishResponse_deleteMembers(&response);
  284. UA_PublishRequest_deleteMembers(&request);
  285. }
  286. return UA_STATUSCODE_GOOD;
  287. }
  288. #endif /* UA_ENABLE_SUBSCRIPTIONS */