ua_client_highlevel_subscriptions.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.*/
  4. #include "ua_client_highlevel.h"
  5. #include "ua_client_internal.h"
  6. #include "ua_util.h"
  7. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  8. UA_StatusCode
  9. UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  10. UA_UInt32 *newSubscriptionId) {
  11. UA_CreateSubscriptionRequest request;
  12. UA_CreateSubscriptionRequest_init(&request);
  13. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  14. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  15. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  16. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  17. request.publishingEnabled = settings.publishingEnabled;
  18. request.priority = settings.priority;
  19. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  20. UA_StatusCode retval = response.responseHeader.serviceResult;
  21. if(retval != UA_STATUSCODE_GOOD)
  22. goto cleanup;
  23. UA_Client_Subscription *newSub = UA_malloc(sizeof(UA_Client_Subscription));
  24. if(!newSub) {
  25. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  26. goto cleanup;
  27. }
  28. LIST_INIT(&newSub->monitoredItems);
  29. newSub->lifeTime = response.revisedLifetimeCount;
  30. newSub->keepAliveCount = response.revisedMaxKeepAliveCount;
  31. newSub->publishingInterval = response.revisedPublishingInterval;
  32. newSub->subscriptionID = response.subscriptionId;
  33. newSub->notificationsPerPublish = request.maxNotificationsPerPublish;
  34. newSub->priority = request.priority;
  35. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  36. if(newSubscriptionId)
  37. *newSubscriptionId = newSub->subscriptionID;
  38. cleanup:
  39. UA_CreateSubscriptionResponse_deleteMembers(&response);
  40. return retval;
  41. }
  42. /* remove the subscription remotely */
  43. UA_StatusCode
  44. UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  45. UA_Client_Subscription *sub;
  46. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  47. if(sub->subscriptionID == subscriptionId)
  48. break;
  49. }
  50. if(!sub)
  51. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  52. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  53. UA_Client_MonitoredItem *mon, *tmpmon;
  54. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
  55. retval =
  56. UA_Client_Subscriptions_removeMonitoredItem(client, sub->subscriptionID,
  57. mon->monitoredItemId);
  58. if(retval != UA_STATUSCODE_GOOD)
  59. return retval;
  60. }
  61. /* remove the subscription remotely */
  62. UA_DeleteSubscriptionsRequest request;
  63. UA_DeleteSubscriptionsRequest_init(&request);
  64. request.subscriptionIdsSize = 1;
  65. request.subscriptionIds = &sub->subscriptionID;
  66. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  67. retval = response.responseHeader.serviceResult;
  68. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
  69. retval = response.results[0];
  70. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  71. if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  72. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  73. "Could not remove subscription %u with error code %s",
  74. sub->subscriptionID, UA_StatusCode_name(retval));
  75. return retval;
  76. }
  77. UA_Client_Subscriptions_forceDelete(client, sub);
  78. return UA_STATUSCODE_GOOD;
  79. }
  80. void
  81. UA_Client_Subscriptions_forceDelete(UA_Client *client,
  82. UA_Client_Subscription *sub) {
  83. UA_Client_MonitoredItem *mon, *mon_tmp;
  84. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
  85. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  86. LIST_REMOVE(mon, listEntry);
  87. UA_free(mon);
  88. }
  89. LIST_REMOVE(sub, listEntry);
  90. UA_free(sub);
  91. }
  92. UA_StatusCode
  93. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  94. UA_NodeId nodeId, UA_UInt32 attributeID,
  95. UA_MonitoredItemHandlingFunction hf,
  96. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  97. UA_Client_Subscription *sub;
  98. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  99. if(sub->subscriptionID == subscriptionId)
  100. break;
  101. }
  102. if(!sub)
  103. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  104. /* Create the handler */
  105. UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
  106. if(!newMon)
  107. return UA_STATUSCODE_BADOUTOFMEMORY;
  108. /* Send the request */
  109. UA_CreateMonitoredItemsRequest request;
  110. UA_CreateMonitoredItemsRequest_init(&request);
  111. request.subscriptionId = subscriptionId;
  112. UA_MonitoredItemCreateRequest item;
  113. UA_MonitoredItemCreateRequest_init(&item);
  114. item.itemToMonitor.nodeId = nodeId;
  115. item.itemToMonitor.attributeId = attributeID;
  116. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  117. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  118. item.requestedParameters.samplingInterval = sub->publishingInterval;
  119. item.requestedParameters.discardOldest = true;
  120. item.requestedParameters.queueSize = 1;
  121. request.itemsToCreate = &item;
  122. request.itemsToCreateSize = 1;
  123. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  124. // slight misuse of retval here to check if the addition was successfull.
  125. UA_StatusCode retval = response.responseHeader.serviceResult;
  126. if(retval == UA_STATUSCODE_GOOD) {
  127. if(response.resultsSize == 1)
  128. retval = response.results[0].statusCode;
  129. else
  130. retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  131. }
  132. if(retval != UA_STATUSCODE_GOOD) {
  133. UA_free(newMon);
  134. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  135. return retval;
  136. }
  137. /* Set the handler */
  138. newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
  139. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  140. newMon->attributeID = attributeID;
  141. newMon->clientHandle = client->monitoredItemHandles;
  142. newMon->samplingInterval = sub->publishingInterval;
  143. newMon->queueSize = 1;
  144. newMon->discardOldest = true;
  145. newMon->handler = hf;
  146. newMon->handlerContext = hfContext;
  147. newMon->monitoredItemId = response.results[0].monitoredItemId;
  148. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  149. *newMonitoredItemId = newMon->monitoredItemId;
  150. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  151. "Created a monitored item with client handle %u",
  152. client->monitoredItemHandles);
  153. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  154. return UA_STATUSCODE_GOOD;
  155. }
  156. UA_StatusCode
  157. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  158. UA_UInt32 monitoredItemId) {
  159. UA_Client_Subscription *sub;
  160. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  161. if(sub->subscriptionID == subscriptionId)
  162. break;
  163. }
  164. if(!sub)
  165. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  166. UA_Client_MonitoredItem *mon;
  167. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  168. if(mon->monitoredItemId == monitoredItemId)
  169. break;
  170. }
  171. if(!mon)
  172. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  173. /* remove the monitoreditem remotely */
  174. UA_DeleteMonitoredItemsRequest request;
  175. UA_DeleteMonitoredItemsRequest_init(&request);
  176. request.subscriptionId = sub->subscriptionID;
  177. request.monitoredItemIdsSize = 1;
  178. request.monitoredItemIds = &mon->monitoredItemId;
  179. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  180. UA_StatusCode retval = response.responseHeader.serviceResult;
  181. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 1)
  182. retval = response.results[0];
  183. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  184. if(retval != UA_STATUSCODE_GOOD &&
  185. retval != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  186. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  187. "Could not remove monitoreditem %u with error code %s",
  188. monitoredItemId, UA_StatusCode_name(retval));
  189. return retval;
  190. }
  191. LIST_REMOVE(mon, listEntry);
  192. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  193. UA_free(mon);
  194. return UA_STATUSCODE_GOOD;
  195. }
  196. static void
  197. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  198. UA_PublishResponse *response) {
  199. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  200. return;
  201. /* Find the subscription */
  202. UA_Client_Subscription *sub;
  203. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  204. if(sub->subscriptionID == response->subscriptionId)
  205. break;
  206. }
  207. if(!sub)
  208. return;
  209. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  210. "Processing a publish response on subscription %u with %u notifications",
  211. sub->subscriptionID, response->notificationMessage.notificationDataSize);
  212. /* Check if the server has acknowledged any of the sent ACKs */
  213. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
  214. /* remove also acks that are unknown to the server */
  215. if(response->results[i] != UA_STATUSCODE_GOOD &&
  216. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  217. continue;
  218. /* Remove the ack from the list */
  219. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  220. UA_Client_NotificationsAckNumber *ack;
  221. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  222. if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
  223. ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
  224. LIST_REMOVE(ack, listEntry);
  225. UA_free(ack);
  226. UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
  227. break;
  228. }
  229. }
  230. }
  231. /* Process the notification messages */
  232. UA_NotificationMessage *msg = &response->notificationMessage;
  233. for(size_t k = 0; k < msg->notificationDataSize; ++k) {
  234. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  235. continue;
  236. /* Currently only dataChangeNotifications are supported */
  237. if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
  238. continue;
  239. UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
  240. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  241. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  242. UA_Client_MonitoredItem *mon;
  243. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  244. if(mon->clientHandle == mitemNot->clientHandle) {
  245. mon->handler(mon->monitoredItemId, &mitemNot->value, mon->handlerContext);
  246. break;
  247. }
  248. }
  249. if(!mon)
  250. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  251. "Could not process a notification with clienthandle %u on subscription %u",
  252. mitemNot->clientHandle, sub->subscriptionID);
  253. }
  254. }
  255. /* Add to the list of pending acks */
  256. UA_Client_NotificationsAckNumber *tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  257. if(!tmpAck) {
  258. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  259. "Not enough memory to store the acknowledgement for a "
  260. "publish message on subscription %u", sub->subscriptionID);
  261. return;
  262. }
  263. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  264. tmpAck->subAck.subscriptionId = sub->subscriptionID;
  265. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  266. }
  267. UA_StatusCode
  268. UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  269. if (client->state == UA_CLIENTSTATE_ERRORED)
  270. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  271. UA_Boolean moreNotifications = true;
  272. while(moreNotifications) {
  273. UA_PublishRequest request;
  274. UA_PublishRequest_init(&request);
  275. request.subscriptionAcknowledgementsSize = 0;
  276. UA_Client_NotificationsAckNumber *ack;
  277. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  278. ++request.subscriptionAcknowledgementsSize;
  279. if(request.subscriptionAcknowledgementsSize > 0) {
  280. request.subscriptionAcknowledgements =
  281. UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  282. if(!request.subscriptionAcknowledgements)
  283. return UA_STATUSCODE_GOOD;
  284. }
  285. int i = 0;
  286. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  287. request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  288. request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  289. ++i;
  290. }
  291. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  292. UA_Client_processPublishResponse(client, &request, &response);
  293. moreNotifications = response.moreNotifications;
  294. UA_PublishResponse_deleteMembers(&response);
  295. UA_PublishRequest_deleteMembers(&request);
  296. }
  297. return UA_STATUSCODE_GOOD;
  298. }
  299. #endif /* UA_ENABLE_SUBSCRIPTIONS */