/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) * Copyright 2016-2017 (c) Florian Palm * Copyright 2015 (c) Chris Iatrou * Copyright 2015-2016 (c) Sten GrĂ¼ner * Copyright 2015-2016 (c) Oleksiy Vasylyev * Copyright 2017 (c) Stefan Profanter, fortiss GmbH * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH * Copyright 2017 (c) Mattias Bornhager * Copyright 2017 (c) Henrik Norrman * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA * Copyright 2018 (c) Fabian Arndt, Root-Core */ #include "ua_server_internal.h" #include "ua_services.h" #include "ua_subscription.h" #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ static UA_StatusCode setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, UA_Double requestedPublishingInterval, UA_UInt32 requestedLifetimeCount, UA_UInt32 requestedMaxKeepAliveCount, UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) { /* deregister the callback if required */ Subscription_unregisterPublishCallback(server, subscription); /* re-parameterize the subscription */ UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits, requestedPublishingInterval, subscription->publishingInterval); /* check for nan*/ if(requestedPublishingInterval != requestedPublishingInterval) subscription->publishingInterval = server->config.publishingIntervalLimits.min; UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits, requestedMaxKeepAliveCount, subscription->maxKeepAliveCount); UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits, requestedLifetimeCount, subscription->lifeTimeCount); if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount) subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount; subscription->notificationsPerPublish = maxNotificationsPerPublish; if(maxNotificationsPerPublish == 0 || maxNotificationsPerPublish > server->config.maxNotificationsPerPublish) subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish; subscription->priority = priority; UA_StatusCode retval = Subscription_registerPublishCallback(server, subscription); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session, "Subscription %u | Could not register publish callback with error code %s", subscription->subscriptionId, UA_StatusCode_name(retval)); } return retval; } void Service_CreateSubscription(UA_Server *server, UA_Session *session, const UA_CreateSubscriptionRequest *request, UA_CreateSubscriptionResponse *response) { /* Check limits for the number of subscriptions */ if(((server->config.maxSubscriptions != 0) && (server->numSubscriptions >= server->config.maxSubscriptions)) || ((server->config.maxSubscriptionsPerSession != 0) && (session->numSubscriptions >= server->config.maxSubscriptionsPerSession))) { response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS; return; } /* Create the subscription */ UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId); if(!newSubscription) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing CreateSubscriptionRequest failed"); response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY; return; } UA_Session_addSubscription(server, session, newSubscription); /* Also assigns the subscription id */ /* Set the subscription parameters */ newSubscription->publishingEnabled = request->publishingEnabled; UA_StatusCode retval = setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval, request->requestedLifetimeCount, request->requestedMaxKeepAliveCount, request->maxNotificationsPerPublish, request->priority); if(retval != UA_STATUSCODE_GOOD) { response->responseHeader.serviceResult = retval; return; } newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* set settings first */ /* Prepare the response */ response->subscriptionId = newSubscription->subscriptionId; response->revisedPublishingInterval = newSubscription->publishingInterval; response->revisedLifetimeCount = newSubscription->lifeTimeCount; response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount; UA_LOG_INFO_SESSION(&server->config.logger, session, "Subscription %u | " "Created the Subscription with a publishing interval of %.2f ms", response->subscriptionId, newSubscription->publishingInterval); } void Service_ModifySubscription(UA_Server *server, UA_Session *session, const UA_ModifySubscriptionRequest *request, UA_ModifySubscriptionResponse *response) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing ModifySubscriptionRequest"); UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); if(!sub) { response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; return; } UA_StatusCode retval = setSubscriptionSettings(server, sub, request->requestedPublishingInterval, request->requestedLifetimeCount, request->requestedMaxKeepAliveCount, request->maxNotificationsPerPublish, request->priority); if(retval != UA_STATUSCODE_GOOD) { response->responseHeader.serviceResult = retval; return; } sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */ response->revisedPublishingInterval = sub->publishingInterval; response->revisedLifetimeCount = sub->lifeTimeCount; response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount; } static void Operation_SetPublishingMode(UA_Server *Server, UA_Session *session, const UA_Boolean *publishingEnabled, const UA_UInt32 *subscriptionId, UA_StatusCode *result) { UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId); if(!sub) { *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; return; } sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */ sub->publishingEnabled = *publishingEnabled; /* Set the publishing mode */ } void Service_SetPublishingMode(UA_Server *server, UA_Session *session, const UA_SetPublishingModeRequest *request, UA_SetPublishingModeResponse *response) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing SetPublishingModeRequest"); UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */ response->responseHeader.serviceResult = UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_SetPublishingMode, &publishingEnabled, &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]); } /* TODO: Unify with senderror in ua_server_binary.c */ static void subscriptionSendError(UA_SecureChannel *channel, UA_UInt32 requestHandle, UA_UInt32 requestId, UA_StatusCode error) { UA_PublishResponse err_response; UA_PublishResponse_init(&err_response); err_response.responseHeader.requestHandle = requestHandle; err_response.responseHeader.timestamp = UA_DateTime_now(); err_response.responseHeader.serviceResult = error; UA_SecureChannel_sendSymmetricMessage(channel, requestId, UA_MESSAGETYPE_MSG, &err_response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); } void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request, UA_UInt32 requestId) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing PublishRequest"); /* Return an error if the session has no subscription */ if(LIST_EMPTY(&session->serverSubscriptions)) { subscriptionSendError(session->header.channel, request->requestHeader.requestHandle, requestId, UA_STATUSCODE_BADNOSUBSCRIPTION); return; } /* Handle too many subscriptions to free resources before trying to allocate * resources for the new publish request. If the limit has been reached the * oldest publish request shall be responded */ if((server->config.maxPublishReqPerSession != 0) && (session->numPublishReq >= server->config.maxPublishReqPerSession)) { if(!UA_Subscription_reachedPublishReqLimit(server, session)) { subscriptionSendError(session->header.channel, requestId, request->requestHeader.requestHandle, UA_STATUSCODE_BADINTERNALERROR); return; } } /* Allocate the response to store it in the retransmission queue */ UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *) UA_malloc(sizeof(UA_PublishResponseEntry)); if(!entry) { subscriptionSendError(session->header.channel, requestId, request->requestHeader.requestHandle, UA_STATUSCODE_BADOUTOFMEMORY); return; } /* Prepare the response */ entry->requestId = requestId; UA_PublishResponse *response = &entry->response; UA_PublishResponse_init(response); response->responseHeader.requestHandle = request->requestHeader.requestHandle; /* Allocate the results array to acknowledge the acknowledge */ if(request->subscriptionAcknowledgementsSize > 0) { response->results = (UA_StatusCode *) UA_Array_new(request->subscriptionAcknowledgementsSize, &UA_TYPES[UA_TYPES_STATUSCODE]); if(!response->results) { UA_free(entry); subscriptionSendError(session->header.channel, requestId, request->requestHeader.requestHandle, UA_STATUSCODE_BADOUTOFMEMORY); return; } response->resultsSize = request->subscriptionAcknowledgementsSize; } /* Delete Acknowledged Subscription Messages */ for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) { UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i]; UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId); if(!sub) { response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Cannot process acknowledgements subscription %u", ack->subscriptionId); continue; } /* Remove the acked transmission from the retransmission queue */ response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber); } /* Queue the publish response. It will be dequeued in a repeated publish * callback. This can also be triggered right now for a late * subscription. */ UA_Session_queuePublishReq(session, entry, false); UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Queued a publication message"); /* If there are late subscriptions, the new publish request is used to * answer them immediately. However, a single subscription that generates * many notifications must not "starve" other late subscriptions. Therefore * we keep track of the last subscription that got preferential treatment. * We start searching for late subscriptions **after** the last one. */ UA_Subscription *immediate = NULL; if(session->lastSeenSubscriptionId > 0) { LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) { if(immediate->subscriptionId == session->lastSeenSubscriptionId) { immediate = LIST_NEXT(immediate, listEntry); break; } } } /* If no entry was found, start at the beginning and don't restart */ UA_Boolean found = false; if(!immediate) immediate = LIST_FIRST(&session->serverSubscriptions); else found = true; repeat: while(immediate) { if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) { session->lastSeenSubscriptionId = immediate->subscriptionId; UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | Response on a late subscription", immediate->subscriptionId); UA_Subscription_publish(server, immediate); return; } immediate = LIST_NEXT(immediate, listEntry); } /* Restart at the beginning of the list */ if(found) { immediate = LIST_FIRST(&session->serverSubscriptions); found = false; goto repeat; } /* No late subscription this time */ session->lastSeenSubscriptionId = 0; } static void Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_, const UA_UInt32 *subscriptionId, UA_StatusCode *result) { *result = UA_Session_deleteSubscription(server, session, *subscriptionId); if(*result == UA_STATUSCODE_GOOD) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | Subscription deleted", *subscriptionId); } else { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Deleting Subscription with Id %u failed with error code %s", *subscriptionId, UA_StatusCode_name(*result)); } } void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session, const UA_DeleteSubscriptionsRequest *request, UA_DeleteSubscriptionsResponse *response) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing DeleteSubscriptionsRequest"); response->responseHeader.serviceResult = UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_DeleteSubscription, NULL, &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]); /* The session has at least one subscription */ if(LIST_FIRST(&session->serverSubscriptions)) return; /* Send remaining publish responses if the last subscription was removed */ UA_Subscription_answerPublishRequestsNoSubscription(server, session); } void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request, UA_RepublishResponse *response) { UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing RepublishRequest"); /* Get the subscription */ UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId); if(!sub) { response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID; return; } /* Reset the subscription lifetime */ sub->currentLifetimeCount = 0; /* Find the notification in the retransmission queue */ UA_NotificationMessageEntry *entry; TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) { if(entry->message.sequenceNumber == request->retransmitSequenceNumber) break; } if(!entry) { response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE; return; } response->responseHeader.serviceResult = UA_NotificationMessage_copy(&entry->message, &response->notificationMessage); } #endif /* UA_ENABLE_SUBSCRIPTIONS */