ua_client_highlevel_subscriptions.c 13 KB


  1. #include "ua_client_highlevel.h"
  2. #include "ua_client_internal.h"
  3. #include "ua_util.h"
  4. #include "ua_types_generated_encoding_binary.h"
  5. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  6. const UA_SubscriptionSettings UA_SubscriptionSettings_standard = {
  7. .requestedPublishingInterval = 500.0,
  8. .requestedLifetimeCount = 10000,
  9. .requestedMaxKeepAliveCount = 1,
  10. .maxNotificationsPerPublish = 10,
  11. .publishingEnabled = true,
  12. .priority = 0
  13. };
  14. UA_StatusCode UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  15. UA_UInt32 *newSubscriptionId) {
  16. UA_CreateSubscriptionRequest request;
  17. UA_CreateSubscriptionRequest_init(&request);
  18. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  19. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  20. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  21. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  22. request.publishingEnabled = settings.publishingEnabled;
  23. request.priority = settings.priority;
  24. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  25. UA_StatusCode retval = response.responseHeader.serviceResult;
  26. if(retval == UA_STATUSCODE_GOOD) {
  27. UA_Client_Subscription *newSub = UA_malloc(sizeof(UA_Client_Subscription));
  28. LIST_INIT(&newSub->MonitoredItems);
  29. newSub->LifeTime = response.revisedLifetimeCount;
  30. newSub->KeepAliveCount = response.revisedMaxKeepAliveCount;
  31. newSub->PublishingInterval = response.revisedPublishingInterval;
  32. newSub->SubscriptionID = response.subscriptionId;
  33. newSub->NotificationsPerPublish = request.maxNotificationsPerPublish;
  34. newSub->Priority = request.priority;
  35. if(newSubscriptionId)
  36. *newSubscriptionId = newSub->SubscriptionID;
  37. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  38. }
  39. UA_CreateSubscriptionResponse_deleteMembers(&response);
  40. return retval;
  41. }
  42. UA_StatusCode UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  43. UA_Client_Subscription *sub;
  44. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  45. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  46. if(sub->SubscriptionID == subscriptionId)
  47. break;
  48. }
  49. // Problem? We do not have this subscription registeres. Maybe the server should
  50. // be consulted at this point?
  51. if(!sub)
  52. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  53. UA_DeleteSubscriptionsRequest request;
  54. UA_DeleteSubscriptionsRequest_init(&request);
  55. request.subscriptionIdsSize = 1;
  56. request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
  57. *request.subscriptionIds = sub->SubscriptionID;
  58. UA_Client_MonitoredItem *mon, *tmpmon;
  59. LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmpmon) {
  60. retval |= UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
  61. mon->MonitoredItemId);
  62. }
  63. if(retval != UA_STATUSCODE_GOOD) {
  64. UA_DeleteSubscriptionsRequest_deleteMembers(&request);
  65. return retval;
  66. }
  67. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  68. if(response.resultsSize > 0)
  69. retval = response.results[0];
  70. else
  71. retval = response.responseHeader.serviceResult;
  72. if(retval == UA_STATUSCODE_GOOD) {
  73. LIST_REMOVE(sub, listEntry);
  74. UA_free(sub);
  75. }
  76. UA_DeleteSubscriptionsRequest_deleteMembers(&request);
  77. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  78. return retval;
  79. }
  80. UA_StatusCode
  81. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  82. UA_NodeId nodeId, UA_UInt32 attributeID,
  83. UA_MonitoredItemHandlingFunction handlingFunction,
  84. void *handlingContext, UA_UInt32 *newMonitoredItemId) {
  85. UA_Client_Subscription *sub;
  86. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  87. if(sub->SubscriptionID == subscriptionId)
  88. break;
  89. }
  90. if(!sub)
  91. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  92. /* Send the request */
  93. UA_CreateMonitoredItemsRequest request;
  94. UA_CreateMonitoredItemsRequest_init(&request);
  95. request.subscriptionId = subscriptionId;
  96. UA_MonitoredItemCreateRequest item;
  97. UA_MonitoredItemCreateRequest_init(&item);
  98. item.itemToMonitor.nodeId = nodeId;
  99. item.itemToMonitor.attributeId = attributeID;
  100. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  101. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  102. item.requestedParameters.samplingInterval = sub->PublishingInterval;
  103. item.requestedParameters.discardOldest = true;
  104. item.requestedParameters.queueSize = 1;
  105. request.itemsToCreate = &item;
  106. request.itemsToCreateSize = 1;
  107. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  108. // slight misuse of retval here to check if the deletion was successfull.
  109. UA_StatusCode retval;
  110. if(response.resultsSize == 0)
  111. retval = response.responseHeader.serviceResult;
  112. else
  113. retval = response.results[0].statusCode;
  114. if(retval != UA_STATUSCODE_GOOD) {
  115. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  116. return retval;
  117. }
  118. /* Create the handler */
  119. UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
  120. newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
  121. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  122. newMon->AttributeID = attributeID;
  123. newMon->ClientHandle = client->monitoredItemHandles;
  124. newMon->SamplingInterval = sub->PublishingInterval;
  125. newMon->QueueSize = 1;
  126. newMon->DiscardOldest = true;
  127. newMon->handler = handlingFunction;
  128. newMon->handlerContext = handlingContext;
  129. newMon->MonitoredItemId = response.results[0].monitoredItemId;
  130. LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
  131. *newMonitoredItemId = newMon->MonitoredItemId;
  132. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  133. "Created a monitored item with client handle %u", client->monitoredItemHandles);
  134. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  135. return UA_STATUSCODE_GOOD;
  136. }
  137. UA_StatusCode
  138. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  139. UA_UInt32 monitoredItemId) {
  140. UA_Client_Subscription *sub;
  141. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  142. if(sub->SubscriptionID == subscriptionId)
  143. break;
  144. }
  145. if(!sub)
  146. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  147. UA_Client_MonitoredItem *mon;
  148. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  149. if(mon->MonitoredItemId == monitoredItemId)
  150. break;
  151. }
  152. if(!mon)
  153. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  154. UA_DeleteMonitoredItemsRequest request;
  155. UA_DeleteMonitoredItemsRequest_init(&request);
  156. request.subscriptionId = sub->SubscriptionID;
  157. request.monitoredItemIdsSize = 1;
  158. request.monitoredItemIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
  159. request.monitoredItemIds[0] = mon->MonitoredItemId;
  160. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  161. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  162. if(response.resultsSize > 1)
  163. retval = response.results[0];
  164. else
  165. retval = response.responseHeader.serviceResult;
  166. if(retval == UA_STATUSCODE_GOOD) {
  167. LIST_REMOVE(mon, listEntry);
  168. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  169. UA_free(mon);
  170. }
  171. UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
  172. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  173. return retval;
  174. }
  175. static void
  176. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request, UA_PublishResponse *response) {
  177. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  178. return;
  179. /* Find the subscription */
  180. UA_Client_Subscription *sub;
  181. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  182. if(sub->SubscriptionID == response->subscriptionId)
  183. break;
  184. }
  185. if(!sub)
  186. return;
  187. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  188. "Processing a publish response on subscription %u with %u notifications",
  189. sub->SubscriptionID, response->notificationMessage.notificationDataSize);
  190. /* Check if the server has acknowledged any of the sent ACKs */
  191. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; i++) {
  192. /* remove also acks that are unknown to the server */
  193. if(response->results[i] != UA_STATUSCODE_GOOD &&
  194. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  195. continue;
  196. /* Remove the ack from the list */
  197. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  198. UA_Client_NotificationsAckNumber *ack;
  199. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  200. if(ack->subAck.subscriptionId != orig_ack->subscriptionId ||
  201. ack->subAck.sequenceNumber != orig_ack->sequenceNumber)
  202. continue;
  203. LIST_REMOVE(ack, listEntry);
  204. UA_free(ack);
  205. }
  206. }
  207. /* Process the notification messages */
  208. UA_NotificationMessage *msg = &response->notificationMessage;
  209. for(size_t k = 0; k < msg->notificationDataSize; k++) {
  210. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  211. continue;
  212. /* Currently only dataChangeNotifications are supported */
  213. if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
  214. continue;
  215. UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
  216. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
  217. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  218. UA_Client_MonitoredItem *mon;
  219. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  220. if(mon->ClientHandle == mitemNot->clientHandle) {
  221. mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
  222. break;
  223. }
  224. }
  225. if(!mon)
  226. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  227. "Could not process a notification with clienthandle %u on subscription %u",
  228. mitemNot->clientHandle, sub->SubscriptionID);
  229. }
  230. }
  231. /* Add to the list of pending acks */
  232. UA_Client_NotificationsAckNumber *tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  233. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  234. tmpAck->subAck.subscriptionId = sub->SubscriptionID;
  235. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  236. }
  237. UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  238. if (client->state == UA_CLIENTSTATE_ERRORED)
  239. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  240. UA_Boolean moreNotifications = true;
  241. while(moreNotifications) {
  242. UA_PublishRequest request;
  243. UA_PublishRequest_init(&request);
  244. request.subscriptionAcknowledgementsSize = 0;
  245. UA_Client_NotificationsAckNumber *ack;
  246. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  247. request.subscriptionAcknowledgementsSize++;
  248. if(request.subscriptionAcknowledgementsSize > 0) {
  249. request.subscriptionAcknowledgements =
  250. UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  251. if(!request.subscriptionAcknowledgements)
  252. return UA_STATUSCODE_GOOD;
  253. }
  254. int index = 0;
  255. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  256. request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
  257. request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
  258. index++;
  259. }
  260. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  261. UA_Client_processPublishResponse(client, &request, &response);
  262. moreNotifications = response.moreNotifications;
  263. UA_PublishResponse_deleteMembers(&response);
  264. UA_PublishRequest_deleteMembers(&request);
  265. }
  266. return UA_STATUSCODE_GOOD;
  267. }
  268. #endif /* UA_ENABLE_SUBSCRIPTIONS */