ua_client_highlevel_subscriptions.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. #include "ua_client_highlevel.h"
  2. #include "ua_client_internal.h"
  3. #include "ua_util.h"
  4. #include "ua_types_generated_encoding_binary.h"
  5. const UA_SubscriptionSettings UA_SubscriptionSettings_standard = {
  6. .requestedPublishingInterval = 0.0,
  7. .requestedLifetimeCount = 100,
  8. .requestedMaxKeepAliveCount = 10,
  9. .maxNotificationsPerPublish = 10,
  10. .publishingEnabled = UA_TRUE,
  11. .priority = 0
  12. };
  13. UA_StatusCode UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  14. UA_UInt32 *newSubscriptionId) {
  15. UA_CreateSubscriptionRequest request;
  16. UA_CreateSubscriptionRequest_init(&request);
  17. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  18. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  19. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  20. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  21. request.publishingEnabled = settings.publishingEnabled;
  22. request.priority = settings.priority;
  23. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  24. UA_StatusCode retval = response.responseHeader.serviceResult;
  25. if(retval == UA_STATUSCODE_GOOD) {
  26. UA_Client_Subscription *newSub = UA_malloc(sizeof(UA_Client_Subscription));
  27. LIST_INIT(&newSub->MonitoredItems);
  28. newSub->LifeTime = response.revisedLifetimeCount;
  29. newSub->KeepAliveCount = response.revisedMaxKeepAliveCount;
  30. newSub->PublishingInterval = response.revisedPublishingInterval;
  31. newSub->SubscriptionID = response.subscriptionId;
  32. newSub->NotificationsPerPublish = request.maxNotificationsPerPublish;
  33. newSub->Priority = request.priority;
  34. if(newSubscriptionId)
  35. *newSubscriptionId = newSub->SubscriptionID;
  36. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  37. }
  38. UA_CreateSubscriptionResponse_deleteMembers(&response);
  39. return retval;
  40. }
  41. UA_StatusCode UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  42. UA_Client_Subscription *sub;
  43. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  44. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  45. if(sub->SubscriptionID == subscriptionId)
  46. break;
  47. }
  48. // Problem? We do not have this subscription registeres. Maybe the server should
  49. // be consulted at this point?
  50. if(!sub)
  51. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  52. UA_DeleteSubscriptionsRequest request;
  53. UA_DeleteSubscriptionsRequest_init(&request);
  54. request.subscriptionIdsSize = 1;
  55. request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
  56. *request.subscriptionIds = sub->SubscriptionID;
  57. UA_Client_MonitoredItem *mon, *tmpmon;
  58. LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmpmon) {
  59. retval |= UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
  60. mon->MonitoredItemId);
  61. }
  62. if(retval != UA_STATUSCODE_GOOD) {
  63. UA_DeleteSubscriptionsRequest_deleteMembers(&request);
  64. return retval;
  65. }
  66. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  67. if(response.resultsSize > 0)
  68. retval = response.results[0];
  69. else
  70. retval = response.responseHeader.serviceResult;
  71. if(retval == UA_STATUSCODE_GOOD) {
  72. LIST_REMOVE(sub, listEntry);
  73. UA_free(sub);
  74. }
  75. UA_DeleteSubscriptionsRequest_deleteMembers(&request);
  76. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  77. return retval;
  78. }
  79. UA_StatusCode
  80. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  81. UA_NodeId nodeId, UA_UInt32 attributeID,
  82. void (*handlingFunction)(UA_UInt32 handle, UA_DataValue *value, void *context),
  83. void *handlingContext,
  84. UA_UInt32 *newMonitoredItemId) {
  85. UA_Client_Subscription *sub;
  86. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  87. if(sub->SubscriptionID == subscriptionId)
  88. break;
  89. }
  90. if(!sub)
  91. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  92. UA_CreateMonitoredItemsRequest request;
  93. UA_CreateMonitoredItemsRequest_init(&request);
  94. request.subscriptionId = subscriptionId;
  95. UA_MonitoredItemCreateRequest item;
  96. UA_MonitoredItemCreateRequest_init(&item);
  97. item.itemToMonitor.nodeId = nodeId;
  98. item.itemToMonitor.attributeId = attributeID;
  99. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  100. item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  101. item.requestedParameters.samplingInterval = sub->PublishingInterval;
  102. item.requestedParameters.discardOldest = UA_TRUE;
  103. item.requestedParameters.queueSize = 1;
  104. request.itemsToCreate = &item;
  105. request.itemsToCreateSize = 1;
  106. // Filter can be left void for now, only changes are supported (UA_Expert does the same with changeItems)
  107. UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
  108. UA_StatusCode retval;
  109. // slight misuse of retval here to check if the deletion was successfull.
  110. if(response.resultsSize == 0)
  111. retval = response.responseHeader.serviceResult;
  112. else
  113. retval = response.results[0].statusCode;
  114. if(retval == UA_STATUSCODE_GOOD) {
  115. UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
  116. newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
  117. UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
  118. newMon->AttributeID = attributeID;
  119. newMon->ClientHandle = client->monitoredItemHandles;
  120. newMon->SamplingInterval = sub->PublishingInterval;
  121. newMon->QueueSize = 1;
  122. newMon->DiscardOldest = UA_TRUE;
  123. newMon->handler = handlingFunction;
  124. newMon->handlerContext = handlingContext;
  125. newMon->MonitoredItemId = response.results[0].monitoredItemId;
  126. LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
  127. *newMonitoredItemId = newMon->MonitoredItemId;
  128. }
  129. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  130. return retval;
  131. }
  132. UA_StatusCode
  133. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  134. UA_UInt32 monitoredItemId) {
  135. UA_Client_Subscription *sub;
  136. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  137. if(sub->SubscriptionID == subscriptionId)
  138. break;
  139. }
  140. if(!sub)
  141. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  142. UA_Client_MonitoredItem *mon;
  143. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  144. if(mon->MonitoredItemId == monitoredItemId)
  145. break;
  146. }
  147. if(!mon)
  148. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  149. UA_DeleteMonitoredItemsRequest request;
  150. UA_DeleteMonitoredItemsRequest_init(&request);
  151. request.subscriptionId = sub->SubscriptionID;
  152. request.monitoredItemIdsSize = 1;
  153. request.monitoredItemIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
  154. request.monitoredItemIds[0] = mon->MonitoredItemId;
  155. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  156. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  157. if(response.resultsSize > 1)
  158. retval = response.results[0];
  159. else
  160. retval = response.responseHeader.serviceResult;
  161. if(retval == UA_STATUSCODE_GOOD) {
  162. LIST_REMOVE(mon, listEntry);
  163. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  164. UA_free(mon);
  165. }
  166. UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
  167. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  168. return retval;
  169. }
  170. static UA_Boolean
  171. UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
  172. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  173. return UA_FALSE;
  174. // Check if the server has acknowledged any of our ACKS
  175. // Note that a list of serverside status codes may be send without valid publish data, i.e.
  176. // during keepalives or no data availability
  177. UA_Client_NotificationsAckNumber *ack, *tmpAck;
  178. size_t i = 0;
  179. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, tmpAck) {
  180. if(response.results[i] == UA_STATUSCODE_GOOD ||
  181. response.results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
  182. LIST_REMOVE(ack, listEntry);
  183. UA_free(ack);
  184. }
  185. i++;
  186. }
  187. if(response.subscriptionId == 0)
  188. return UA_FALSE;
  189. UA_Client_Subscription *sub;
  190. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  191. if(sub->SubscriptionID == response.subscriptionId)
  192. break;
  193. }
  194. if(!sub)
  195. return UA_FALSE;
  196. UA_NotificationMessage msg = response.notificationMessage;
  197. UA_Client_MonitoredItem *mon;
  198. for(size_t k = 0; k < msg.notificationDataSize; k++) {
  199. if(msg.notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  200. continue;
  201. if(msg.notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  202. // This is a dataChangeNotification
  203. UA_DataChangeNotification *dataChangeNotification = msg.notificationData[k].content.decoded.data;
  204. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
  205. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  206. // find this client handle
  207. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
  208. if(mon->ClientHandle == mitemNot->clientHandle) {
  209. mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
  210. break;
  211. }
  212. }
  213. }
  214. continue;
  215. }
  216. /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
  217. /* msg.notificationData[k].typeId.identifier.numeric == 820 ) { */
  218. /* //FIXME: This is a statusChangeNotification (not supported yet) */
  219. /* continue; */
  220. /* } */
  221. /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
  222. /* msg.notificationData[k].typeId.identifier.numeric == 916 ) { */
  223. /* //FIXME: This is an EventNotification */
  224. /* continue; */
  225. /* } */
  226. }
  227. /* We processed this message, add it to the list of pending acks (but make
  228. sure it's not in the list first) */
  229. LIST_FOREACH(tmpAck, &client->pendingNotificationsAcks, listEntry) {
  230. if(tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
  231. tmpAck->subAck.subscriptionId == response.subscriptionId)
  232. break;
  233. }
  234. if(!tmpAck) {
  235. tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  236. tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
  237. tmpAck->subAck.subscriptionId = sub->SubscriptionID;
  238. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  239. }
  240. return response.moreNotifications;
  241. }
  242. UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  243. if (client->state == UA_CLIENTSTATE_ERRORED){
  244. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  245. }
  246. UA_Boolean moreNotifications = UA_TRUE;
  247. do {
  248. UA_PublishRequest request;
  249. UA_PublishRequest_init(&request);
  250. request.subscriptionAcknowledgementsSize = 0;
  251. UA_Client_NotificationsAckNumber *ack;
  252. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  253. request.subscriptionAcknowledgementsSize++;
  254. if(request.subscriptionAcknowledgementsSize > 0) {
  255. request.subscriptionAcknowledgements = UA_malloc(sizeof(UA_SubscriptionAcknowledgement) *
  256. request.subscriptionAcknowledgementsSize);
  257. if(!request.subscriptionAcknowledgements)
  258. return UA_STATUSCODE_GOOD;
  259. }
  260. int index = 0 ;
  261. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  262. request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
  263. request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
  264. index++;
  265. }
  266. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  267. if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
  268. moreNotifications = UA_Client_processPublishRx(client, response);
  269. else
  270. moreNotifications = UA_FALSE;
  271. UA_PublishResponse_deleteMembers(&response);
  272. UA_PublishRequest_deleteMembers(&request);
  273. } while(moreNotifications == UA_TRUE);
  274. return UA_STATUSCODE_GOOD;
  275. }