/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) * Copyright 2015 (c) Oleksiy Vasylyev * Copyright 2016 (c) Sten Grüner * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA * Copyright 2016-2017 (c) Florian Palm * Copyright 2017 (c) Frank Meerkötter * Copyright 2017 (c) Stefan Profanter, fortiss GmbH */ #include "ua_client_highlevel.h" #include "ua_client_internal.h" #include "ua_util.h" #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ /*****************/ /* Subscriptions */ /*****************/ UA_CreateSubscriptionResponse UA_EXPORT UA_Client_Subscriptions_create(UA_Client *client, const UA_CreateSubscriptionRequest request, void *subscriptionContext, UA_Client_StatusChangeNotificationCallback statusChangeCallback, UA_Client_DeleteSubscriptionCallback deleteCallback) { UA_CreateSubscriptionResponse response; UA_CreateSubscriptionResponse_init(&response); /* Allocate the internal representation */ UA_Client_Subscription *newSub = (UA_Client_Subscription*) UA_malloc(sizeof(UA_Client_Subscription)); if(!newSub) { response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; return response; } /* Send the request as a synchronous service call */ __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST], &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]); if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) { UA_free(newSub); return response; } /* Prepare the internal representation */ newSub->context = subscriptionContext; newSub->subscriptionId = response.subscriptionId; newSub->sequenceNumber = 0; newSub->lastActivity = UA_DateTime_nowMonotonic(); newSub->statusChangeCallback = statusChangeCallback; newSub->deleteCallback = deleteCallback; newSub->publishingInterval = response.revisedPublishingInterval; newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount; LIST_INIT(&newSub->monitoredItems); LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry); return response; } static UA_Client_Subscription * findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) { UA_Client_Subscription *sub = NULL; LIST_FOREACH(sub, &client->subscriptions, listEntry) { if(sub->subscriptionId == subscriptionId) break; } return sub; } UA_ModifySubscriptionResponse UA_EXPORT UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) { UA_ModifySubscriptionResponse response; UA_ModifySubscriptionResponse_init(&response); /* Find the internal representation */ UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId); if(!sub) { response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; return response; } /* Call the service */ __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST], &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]); /* Adjust the internal representation */ sub->publishingInterval = response.revisedPublishingInterval; sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount; return response; } static void UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) { /* Remove the MonitoredItems */ UA_Client_MonitoredItem *mon, *mon_tmp; LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) UA_Client_MonitoredItem_remove(client, sub, mon); /* Call the delete callback */ if(sub->deleteCallback) sub->deleteCallback(client, sub->subscriptionId, sub->context); /* Remove */ LIST_REMOVE(sub, listEntry); UA_free(sub); } UA_DeleteSubscriptionsResponse UA_EXPORT UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) { UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize); memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize); /* temporary remove the subscriptions from the list */ for(size_t i = 0; i < request.subscriptionIdsSize; i++) { subs[i] = findSubscription(client, request.subscriptionIds[i]); if (subs[i]) LIST_REMOVE(subs[i], listEntry); } /* Send the request */ UA_DeleteSubscriptionsResponse response; __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST], &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]); if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) goto cleanup; if(request.subscriptionIdsSize != response.resultsSize) { response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; goto cleanup; } /* Loop over the removed subscriptions and remove internally */ for(size_t i = 0; i < request.subscriptionIdsSize; i++) { if(response.results[i] != UA_STATUSCODE_GOOD && response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) { /* Something was wrong, reinsert the subscription in the list */ if (subs[i]) LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); continue; } if(!subs[i]) { UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT, "No internal representation of subscription %u", request.subscriptionIds[i]); continue; } else { LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); } UA_Client_Subscription_deleteInternal(client, subs[i]); } return response; cleanup: for(size_t i = 0; i < request.subscriptionIdsSize; i++) { if (subs[i]) { LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry); } } return response; } UA_StatusCode UA_EXPORT UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) { UA_DeleteSubscriptionsRequest request; UA_DeleteSubscriptionsRequest_init(&request); request.subscriptionIds = &subscriptionId; request.subscriptionIdsSize = 1; UA_DeleteSubscriptionsResponse response = UA_Client_Subscriptions_delete(client, request); UA_StatusCode retval = response.responseHeader.serviceResult; if(retval != UA_STATUSCODE_GOOD) { UA_DeleteSubscriptionsResponse_deleteMembers(&response); return retval; } if(response.resultsSize != 1) { UA_DeleteSubscriptionsResponse_deleteMembers(&response); return UA_STATUSCODE_BADINTERNALERROR; } retval = response.results[0]; UA_DeleteSubscriptionsResponse_deleteMembers(&response); return retval; } /******************/ /* MonitoredItems */ /******************/ void UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub, UA_Client_MonitoredItem *mon) { LIST_REMOVE(mon, listEntry); if(mon->deleteCallback) mon->deleteCallback(client, sub->subscriptionId, sub->context, mon->monitoredItemId, mon->context); UA_free(mon); } static void __UA_Client_MonitoredItems_create(UA_Client *client, const UA_CreateMonitoredItemsRequest *request, void **contexts, void **handlingCallbacks, UA_Client_DeleteMonitoredItemCallback *deleteCallbacks, UA_CreateMonitoredItemsResponse *response) { UA_CreateMonitoredItemsResponse_init(response); if (!request->itemsToCreateSize) { response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; return; } /* Fix clang warning */ size_t itemsToCreateSize = request->itemsToCreateSize; UA_Client_Subscription *sub = NULL; /* Allocate the memory for internal representations */ UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize); memset(mis, 0, sizeof(void*) * itemsToCreateSize); for(size_t i = 0; i < itemsToCreateSize; i++) { mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem)); if(!mis[i]) { response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; goto cleanup; } } /* Get the subscription */ sub = findSubscription(client, request->subscriptionId); if(!sub) { response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; goto cleanup; } /* Set the clientHandle */ for(size_t i = 0; i < itemsToCreateSize; i++) request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles); /* Call the service */ __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST], response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]); if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) goto cleanup; if(response->resultsSize != itemsToCreateSize) { response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; goto cleanup; } /* Add internally */ for(size_t i = 0; i < itemsToCreateSize; i++) { if(response->results[i].statusCode != UA_STATUSCODE_GOOD) { if (deleteCallbacks[i]) deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]); UA_free(mis[i]); mis[i] = NULL; continue; } UA_Client_MonitoredItem *newMon = mis[i]; newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle; newMon->monitoredItemId = response->results[i].monitoredItemId; newMon->context = contexts[i]; newMon->deleteCallback = deleteCallbacks[i]; newMon->handler.dataChangeCallback = (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i]; newMon->isEventMonitoredItem = (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER); LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry); } return; cleanup: for(size_t i = 0; i < itemsToCreateSize; i++) { if (deleteCallbacks[i]) { if (sub) deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]); else deleteCallbacks[i](client, 0, NULL, 0, contexts[i]); } if(mis[i]) UA_free(mis[i]); } } UA_CreateMonitoredItemsResponse UA_EXPORT UA_Client_MonitoredItems_createDataChanges(UA_Client *client, const UA_CreateMonitoredItemsRequest request, void **contexts, UA_Client_DataChangeNotificationCallback *callbacks, UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) { UA_CreateMonitoredItemsResponse response; __UA_Client_MonitoredItems_create(client, &request, contexts, (void**)(uintptr_t)callbacks, deleteCallbacks, &response); return response; } UA_MonitoredItemCreateResult UA_EXPORT UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId, UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item, void *context, UA_Client_DataChangeNotificationCallback callback, UA_Client_DeleteMonitoredItemCallback deleteCallback) { UA_CreateMonitoredItemsRequest request; UA_CreateMonitoredItemsRequest_init(&request); request.subscriptionId = subscriptionId; request.timestampsToReturn = timestampsToReturn; request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item; request.itemsToCreateSize = 1; UA_CreateMonitoredItemsResponse response = UA_Client_MonitoredItems_createDataChanges(client, request, &context, &callback, &deleteCallback); UA_MonitoredItemCreateResult result; UA_MonitoredItemCreateResult_init(&result); if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) result.statusCode = response.responseHeader.serviceResult; if(result.statusCode == UA_STATUSCODE_GOOD && response.resultsSize != 1) result.statusCode = UA_STATUSCODE_BADINTERNALERROR; if(result.statusCode == UA_STATUSCODE_GOOD) UA_MonitoredItemCreateResult_copy(&response.results[0] , &result); UA_CreateMonitoredItemsResponse_deleteMembers(&response); return result; } UA_CreateMonitoredItemsResponse UA_EXPORT UA_Client_MonitoredItems_createEvents(UA_Client *client, const UA_CreateMonitoredItemsRequest request, void **contexts, UA_Client_EventNotificationCallback *callbacks, UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) { UA_CreateMonitoredItemsResponse response; __UA_Client_MonitoredItems_create(client, &request, contexts, (void**)(uintptr_t)callbacks, deleteCallbacks, &response); return response; } UA_MonitoredItemCreateResult UA_EXPORT UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId, UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item, void *context, UA_Client_EventNotificationCallback callback, UA_Client_DeleteMonitoredItemCallback deleteCallback) { UA_CreateMonitoredItemsRequest request; UA_CreateMonitoredItemsRequest_init(&request); request.subscriptionId = subscriptionId; request.timestampsToReturn = timestampsToReturn; request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item; request.itemsToCreateSize = 1; UA_CreateMonitoredItemsResponse response = UA_Client_MonitoredItems_createEvents(client, request, &context, &callback, &deleteCallback); UA_MonitoredItemCreateResult result; UA_MonitoredItemCreateResult_copy(response.results , &result); UA_CreateMonitoredItemsResponse_deleteMembers(&response); return result; } UA_DeleteMonitoredItemsResponse UA_EXPORT UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) { /* Send the request */ UA_DeleteMonitoredItemsResponse response; __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST], &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]); if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) return response; UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId); if(!sub) { UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT, "No internal representation of subscription %u", request.subscriptionId); return response; } /* Loop over deleted MonitoredItems */ for(size_t i = 0; i < response.resultsSize; i++) { if(response.results[i] != UA_STATUSCODE_GOOD && response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) { continue; } #ifndef __clang_analyzer__ /* Delete the internal representation */ UA_Client_MonitoredItem *mon; LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { if(mon->monitoredItemId == request.monitoredItemIds[i]) { UA_Client_MonitoredItem_remove(client, sub, mon); break; } } #endif } return response; } UA_StatusCode UA_EXPORT UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) { UA_DeleteMonitoredItemsRequest request; UA_DeleteMonitoredItemsRequest_init(&request); request.subscriptionId = subscriptionId; request.monitoredItemIds = &monitoredItemId; request.monitoredItemIdsSize = 1; UA_DeleteMonitoredItemsResponse response = UA_Client_MonitoredItems_delete(client, request); UA_StatusCode retval = response.responseHeader.serviceResult; if(retval != UA_STATUSCODE_GOOD) { UA_DeleteMonitoredItemsResponse_deleteMembers(&response); return retval; } if(response.resultsSize != 1) { UA_DeleteMonitoredItemsResponse_deleteMembers(&response); return UA_STATUSCODE_BADINTERNALERROR; } retval = response.results[0]; UA_DeleteMonitoredItemsResponse_deleteMembers(&response); return retval; } /*************************************/ /* Async Processing of Notifications */ /*************************************/ /* Assume the request is already initialized */ UA_StatusCode UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) { /* Count acks */ UA_Client_NotificationsAckNumber *ack; LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) ++request->subscriptionAcknowledgementsSize; /* Create the array. Returns a sentinel pointer if the length is zero. */ request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*) UA_Array_new(request->subscriptionAcknowledgementsSize, &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]); if(!request->subscriptionAcknowledgements) { request->subscriptionAcknowledgementsSize = 0; return UA_STATUSCODE_BADOUTOFMEMORY; } size_t i = 0; UA_Client_NotificationsAckNumber *ack_tmp; LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) { request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber; request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId; ++i; LIST_REMOVE(ack, listEntry); UA_free(ack); } return UA_STATUSCODE_GOOD; } /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */ /* The value 0 is never used for the sequence number */ static UA_UInt32 UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) { UA_UInt32 nextSequenceNumber = sequenceNumber + 1; if(nextSequenceNumber == 0) nextSequenceNumber = 1; return nextSequenceNumber; } static void processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub, UA_DataChangeNotification *dataChangeNotification) { for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) { UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j]; /* Find the MonitoredItem */ UA_Client_MonitoredItem *mon; LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { if(mon->clientHandle == min->clientHandle) break; } if(!mon) { UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT, "Could not process a notification with clienthandle %u on subscription %u", min->clientHandle, sub->subscriptionId); continue; } if(mon->isEventMonitoredItem) { UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT, "MonitoredItem is configured for Events. But received a " "DataChangeNotification."); continue; } mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context, mon->monitoredItemId, mon->context, &min->value); } } static void processEventNotification(UA_Client *client, UA_Client_Subscription *sub, UA_EventNotificationList *eventNotificationList) { for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) { UA_EventFieldList *eventFieldList = &eventNotificationList->events[j]; /* Find the MonitoredItem */ UA_Client_MonitoredItem *mon; LIST_FOREACH(mon, &sub->monitoredItems, listEntry) { if(mon->monitoredItemId == eventFieldList->clientHandle) break; } if(!mon) { UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT, "Could not process a notification with clienthandle %u on subscription %u", eventFieldList->clientHandle, sub->subscriptionId); continue; } if(!mon->isEventMonitoredItem) { UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT, "MonitoredItem is configured for DataChanges. But received a " "EventNotification."); continue; } mon->handler.eventCallback(client, sub->subscriptionId, sub->context, mon->monitoredItemId, mon->context, eventFieldList->eventFieldsSize, eventFieldList->eventFields); } } static void processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub, UA_ExtensionObject *msg) { if(msg->encoding != UA_EXTENSIONOBJECT_DECODED) return; /* Handle DataChangeNotification */ if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) { UA_DataChangeNotification *dataChangeNotification = (UA_DataChangeNotification *)msg->content.decoded.data; processDataChangeNotification(client, sub, dataChangeNotification); return; } /* Handle EventNotification */ if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) { UA_EventNotificationList *eventNotificationList = (UA_EventNotificationList *)msg->content.decoded.data; processEventNotification(client, sub, eventNotificationList); return; } /* Handle StatusChangeNotification */ if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) { if(sub->statusChangeCallback) { sub->statusChangeCallback(client, sub->subscriptionId, sub->context, (UA_StatusChangeNotification*)msg->content.decoded.data); } else { UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Dropped a StatusChangeNotification since no callback is registered"); } return; } UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Unknown notification message type"); } void UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request, UA_PublishResponse *response) { UA_NotificationMessage *msg = &response->notificationMessage; client->currentlyOutStandingPublishRequests--; if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) { if(client->config.outStandingPublishRequests > 1) { client->config.outStandingPublishRequests--; UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Too many publishrequest, reduce outStandingPublishRequests to %d", client->config.outStandingPublishRequests); } else { UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT, "Too many publishrequest when outStandingPublishRequests = 1"); UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId); } return; } if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN) return; if(!LIST_FIRST(&client->subscriptions)) { response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION; return; } if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) { UA_Client_close(client); /* TODO: This should be handled before the process callback */ UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Received BadSessionIdInvalid"); return; } if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Received Publish Response with code %s", UA_StatusCode_name(response->responseHeader.serviceResult)); return; } UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId); if(!sub) { response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR; UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Received Publish Response for a non-existant subscription"); return; } sub->lastActivity = UA_DateTime_nowMonotonic(); /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */ if((sub->sequenceNumber != msg->sequenceNumber) && (msg->sequenceNumber != 0) && (UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber)) { UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT, "Invalid subscritpion sequenceNumber"); UA_Client_close(client); return; } sub->sequenceNumber = msg->sequenceNumber; /* Process the notification messages */ for(size_t k = 0; k < msg->notificationDataSize; ++k) processNotificationMessage(client, sub, &msg->notificationData[k]); /* Add to the list of pending acks */ for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) { if(response->availableSequenceNumbers[i] != msg->sequenceNumber) continue; UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*) UA_malloc(sizeof(UA_Client_NotificationsAckNumber)); if(!tmpAck) { UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT, "Not enough memory to store the acknowledgement for a publish " "message on subscription %u", sub->subscriptionId); break; } tmpAck->subAck.sequenceNumber = msg->sequenceNumber; tmpAck->subAck.subscriptionId = sub->subscriptionId; LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry); break; } } static void processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId, void *response, const UA_DataType *responseType) { UA_PublishRequest *req = (UA_PublishRequest*)userdata; UA_PublishResponse *res = (UA_PublishResponse*)response; /* Process the response */ UA_Client_Subscriptions_processPublishResponse(client, req, res); /* Delete the cached request */ UA_PublishRequest_delete(req); /* Fill up the outstanding publish requests */ UA_Client_Subscriptions_backgroundPublish(client); } void UA_Client_Subscriptions_clean(UA_Client *client) { UA_Client_NotificationsAckNumber *n, *tmp; LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) { LIST_REMOVE(n, listEntry); UA_free(n); } UA_Client_Subscription *sub, *tmps; LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps) UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */ client->monitoredItemHandles = 0; } void UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) { if(client->state < UA_CLIENTSTATE_SESSION) return; /* Is the lack of responses the client's fault? */ if(client->currentlyOutStandingPublishRequests == 0) return; UA_Client_Subscription *sub; LIST_FOREACH(sub, &client->subscriptions, listEntry) { UA_DateTime maxSilence = (UA_DateTime) ((sub->publishingInterval * sub->maxKeepAliveCount) + client->config.timeout) * UA_DATETIME_MSEC; if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) { /* Reset activity */ sub->lastActivity = UA_DateTime_nowMonotonic(); if (client->config.subscriptionInactivityCallback) client->config.subscriptionInactivityCallback(client, sub->subscriptionId, sub->context); UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT, "Inactivity for Subscription %u.", sub->subscriptionId); } } } UA_StatusCode UA_Client_Subscriptions_backgroundPublish(UA_Client *client) { if(client->state < UA_CLIENTSTATE_SESSION) return UA_STATUSCODE_BADSERVERNOTCONNECTED; /* The session must have at least one subscription */ if(!LIST_FIRST(&client->subscriptions)) return UA_STATUSCODE_GOOD; while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) { UA_PublishRequest *request = UA_PublishRequest_new(); if (!request) return UA_STATUSCODE_BADOUTOFMEMORY; UA_StatusCode retval = UA_Client_preparePublishRequest(client, request); if(retval != UA_STATUSCODE_GOOD) { UA_PublishRequest_delete(request); return retval; } UA_UInt32 requestId; client->currentlyOutStandingPublishRequests++; retval = __UA_Client_AsyncService(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST], processPublishResponseAsync, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE], (void*)request, &requestId); if(retval != UA_STATUSCODE_GOOD) { UA_PublishRequest_delete(request); return retval; } } return UA_STATUSCODE_GOOD; } #endif /* UA_ENABLE_SUBSCRIPTIONS */