123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- #include "ua_client_highlevel.h"
- #include "ua_client_internal.h"
- #include "ua_util.h"
- #include "ua_types_generated_encoding_binary.h"
- const UA_SubscriptionSettings UA_SubscriptionSettings_standard = {
- .requestedPublishingInterval = 0.0,
- .requestedLifetimeCount = 100,
- .requestedMaxKeepAliveCount = 10,
- .maxNotificationsPerPublish = 10,
- .publishingEnabled = UA_TRUE,
- .priority = 0
- };
- UA_StatusCode UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
- UA_UInt32 *newSubscriptionId) {
- UA_CreateSubscriptionRequest request;
- UA_CreateSubscriptionRequest_init(&request);
- request.requestedPublishingInterval = settings.requestedPublishingInterval;
- request.requestedLifetimeCount = settings.requestedLifetimeCount;
- request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
- request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
- request.publishingEnabled = settings.publishingEnabled;
- request.priority = settings.priority;
-
- UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
- UA_StatusCode retval = response.responseHeader.serviceResult;
- if(retval == UA_STATUSCODE_GOOD) {
- UA_Client_Subscription *newSub = UA_malloc(sizeof(UA_Client_Subscription));
- LIST_INIT(&newSub->MonitoredItems);
- newSub->LifeTime = response.revisedLifetimeCount;
- newSub->KeepAliveCount = response.revisedMaxKeepAliveCount;
- newSub->PublishingInterval = response.revisedPublishingInterval;
- newSub->SubscriptionID = response.subscriptionId;
- newSub->NotificationsPerPublish = request.maxNotificationsPerPublish;
- newSub->Priority = request.priority;
- if(newSubscriptionId)
- *newSubscriptionId = newSub->SubscriptionID;
- LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
- }
-
- UA_CreateSubscriptionResponse_deleteMembers(&response);
- return retval;
- }
- UA_StatusCode UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
- UA_Client_Subscription *sub;
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
-
- LIST_FOREACH(sub, &client->subscriptions, listEntry) {
- if(sub->SubscriptionID == subscriptionId)
- break;
- }
-
- // Problem? We do not have this subscription registeres. Maybe the server should
- // be consulted at this point?
- if(!sub)
- return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-
- UA_DeleteSubscriptionsRequest request;
- UA_DeleteSubscriptionsRequest_init(&request);
- request.subscriptionIdsSize = 1;
- request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
- *request.subscriptionIds = sub->SubscriptionID;
-
- UA_Client_MonitoredItem *mon, *tmpmon;
- LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmpmon) {
- retval |= UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
- mon->MonitoredItemId);
- }
- if(retval != UA_STATUSCODE_GOOD){
- UA_DeleteSubscriptionsRequest_deleteMembers(&request);
- return retval;
- }
-
- UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
- if(response.resultsSize > 0)
- retval = response.results[0];
- else
- retval = response.responseHeader.serviceResult;
-
- if(retval == UA_STATUSCODE_GOOD) {
- LIST_REMOVE(sub, listEntry);
- UA_free(sub);
- }
- UA_DeleteSubscriptionsRequest_deleteMembers(&request);
- UA_DeleteSubscriptionsResponse_deleteMembers(&response);
- return retval;
- }
- UA_StatusCode
- UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
- UA_NodeId nodeId, UA_UInt32 attributeID,
- void *handlingFunction, UA_UInt32 *newMonitoredItemId) {
- UA_Client_Subscription *sub;
- LIST_FOREACH(sub, &client->subscriptions, listEntry) {
- if(sub->SubscriptionID == subscriptionId)
- break;
- }
- if(!sub)
- return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-
- UA_CreateMonitoredItemsRequest request;
- UA_CreateMonitoredItemsRequest_init(&request);
- request.subscriptionId = subscriptionId;
- UA_MonitoredItemCreateRequest item;
- UA_MonitoredItemCreateRequest_init(&item);
- item.itemToMonitor.nodeId = nodeId;
- item.itemToMonitor.attributeId = attributeID;
- item.monitoringMode = UA_MONITORINGMODE_REPORTING;
- item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
- item.requestedParameters.samplingInterval = sub->PublishingInterval;
- item.requestedParameters.discardOldest = UA_TRUE;
- item.requestedParameters.queueSize = 1;
- request.itemsToCreate = &item;
- request.itemsToCreateSize = 1;
- // Filter can be left void for now, only changes are supported (UA_Expert does the same with changeItems)
-
- UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
-
- UA_StatusCode retval;
- // slight misuse of retval here to check if the deletion was successfull.
- if(response.resultsSize == 0)
- retval = response.responseHeader.serviceResult;
- else
- retval = response.results[0].statusCode;
-
- if(retval == UA_STATUSCODE_GOOD) {
- UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
- newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
- UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
- newMon->AttributeID = attributeID;
- newMon->ClientHandle = client->monitoredItemHandles;
- newMon->SamplingInterval = sub->PublishingInterval;
- newMon->QueueSize = 1;
- newMon->DiscardOldest = UA_TRUE;
- newMon->handler = handlingFunction;
- newMon->MonitoredItemId = response.results[0].monitoredItemId;
- LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
- *newMonitoredItemId = newMon->MonitoredItemId;
- }
-
- UA_CreateMonitoredItemsResponse_deleteMembers(&response);
- return retval;
- }
- UA_StatusCode
- UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
- UA_UInt32 monitoredItemId) {
- UA_Client_Subscription *sub;
- LIST_FOREACH(sub, &client->subscriptions, listEntry) {
- if(sub->SubscriptionID == subscriptionId)
- break;
- }
- if(!sub)
- return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-
- UA_Client_MonitoredItem *mon;
- LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
- if(mon->MonitoredItemId == monitoredItemId)
- break;
- }
- if(!mon)
- return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
-
- UA_DeleteMonitoredItemsRequest request;
- UA_DeleteMonitoredItemsRequest_init(&request);
- request.subscriptionId = sub->SubscriptionID;
- request.monitoredItemIdsSize = 1;
- request.monitoredItemIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
- request.monitoredItemIds[0] = mon->MonitoredItemId;
-
- UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
- if(response.resultsSize > 1)
- retval = response.results[0];
- else
- retval = response.responseHeader.serviceResult;
-
- if(retval == UA_STATUSCODE_GOOD) {
- LIST_REMOVE(mon, listEntry);
- UA_NodeId_deleteMembers(&mon->monitoredNodeId);
- UA_free(mon);
- }
-
- UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
- UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
- return retval;
- }
- static UA_Boolean
- UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
- if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
- return UA_FALSE;
-
- // Check if the server has acknowledged any of our ACKS
- // Note that a list of serverside status codes may be send without valid publish data, i.e.
- // during keepalives or no data availability
- UA_Client_NotificationsAckNumber *ack, *tmpAck;
- size_t i = 0;
- LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, tmpAck) {
- if(response.results[i] == UA_STATUSCODE_GOOD ||
- response.results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
- LIST_REMOVE(ack, listEntry);
- UA_free(ack);
- }
- i++;
- }
-
- if(response.subscriptionId == 0)
- return UA_FALSE;
-
- UA_Client_Subscription *sub;
- LIST_FOREACH(sub, &client->subscriptions, listEntry) {
- if(sub->SubscriptionID == response.subscriptionId)
- break;
- }
- if(!sub)
- return UA_FALSE;
-
- UA_NotificationMessage msg = response.notificationMessage;
- UA_Client_MonitoredItem *mon;
- for(size_t k = 0; k < msg.notificationDataSize; k++) {
- if(msg.notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
- continue;
-
- if(msg.notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
- // This is a dataChangeNotification
- UA_DataChangeNotification *dataChangeNotification = msg.notificationData[k].content.decoded.data;
- for(size_t i = 0; i < dataChangeNotification->monitoredItemsSize; i++) {
- UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[i];
- // find this client handle
- LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
- if(mon->ClientHandle == mitemNot->clientHandle) {
- mon->handler(mitemNot->clientHandle, &mitemNot->value);
- break;
- }
- }
- }
- continue;
- }
- /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
- /* msg.notificationData[k].typeId.identifier.numeric == 820 ) { */
- /* //FIXME: This is a statusChangeNotification (not supported yet) */
- /* continue; */
- /* } */
- /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
- /* msg.notificationData[k].typeId.identifier.numeric == 916 ) { */
- /* //FIXME: This is an EventNotification */
- /* continue; */
- /* } */
- }
-
- /* We processed this message, add it to the list of pending acks (but make
- sure it's not in the list first) */
- LIST_FOREACH(tmpAck, &client->pendingNotificationsAcks, listEntry) {
- if(tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
- tmpAck->subAck.subscriptionId == response.subscriptionId)
- break;
- }
- if(!tmpAck) {
- tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
- tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
- tmpAck->subAck.subscriptionId = sub->SubscriptionID;
- LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
- }
-
- return response.moreNotifications;
- }
- void UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
- UA_Boolean moreNotifications = UA_TRUE;
- do {
- UA_PublishRequest request;
- UA_PublishRequest_init(&request);
- request.subscriptionAcknowledgementsSize = 0;
- UA_Client_NotificationsAckNumber *ack;
- LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
- request.subscriptionAcknowledgementsSize++;
- request.subscriptionAcknowledgements = UA_malloc(sizeof(UA_SubscriptionAcknowledgement) *
- request.subscriptionAcknowledgementsSize);
-
- int index = 0 ;
- LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
- request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
- request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
- index++;
- }
-
- UA_PublishResponse response = UA_Client_Service_publish(client, request);
- if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
- moreNotifications = UA_Client_processPublishRx(client, response);
- else
- moreNotifications = UA_FALSE;
-
- UA_PublishResponse_deleteMembers(&response);
- UA_PublishRequest_deleteMembers(&request);
- } while(moreNotifications == UA_TRUE);
- }
|