|
@@ -1,3 +1,5 @@
|
|
|
|
+#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
|
|
|
|
+
|
|
#include "ua_server_internal.h"
|
|
#include "ua_server_internal.h"
|
|
#include "ua_services.h"
|
|
#include "ua_services.h"
|
|
#include "ua_subscription.h"
|
|
#include "ua_subscription.h"
|
|
@@ -15,21 +17,22 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
|
|
UA_UInt32 requestedMaxKeepAliveCount,
|
|
UA_UInt32 requestedMaxKeepAliveCount,
|
|
UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
|
|
UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
|
|
Subscription_unregisterPublishJob(server, subscription);
|
|
Subscription_unregisterPublishJob(server, subscription);
|
|
- subscription->publishingInterval = requestedPublishingInterval;
|
|
|
|
- UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
|
|
|
|
- requestedPublishingInterval, subscription->publishingInterval);
|
|
|
|
- /* check for nan*/
|
|
|
|
- if(requestedPublishingInterval != requestedPublishingInterval)
|
|
|
|
- subscription->publishingInterval = server->config.publishingIntervalLimits.min;
|
|
|
|
|
|
+ subscription->publishingInterval = requestedPublishingInterval;
|
|
|
|
+ UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
|
|
|
|
+ requestedPublishingInterval, subscription->publishingInterval);
|
|
|
|
+ /* check for nan*/
|
|
|
|
+ if(requestedPublishingInterval != requestedPublishingInterval)
|
|
|
|
+ subscription->publishingInterval = server->config.publishingIntervalLimits.min;
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
|
|
requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
|
|
requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
|
|
requestedLifetimeCount, subscription->lifeTimeCount);
|
|
requestedLifetimeCount, subscription->lifeTimeCount);
|
|
if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
|
|
if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
|
|
subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
|
|
subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
|
|
- subscription->notificationsPerPublish = maxNotificationsPerPublish;
|
|
|
|
- if(maxNotificationsPerPublish == 0 || maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
|
|
|
|
- subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
|
|
|
|
|
|
+ subscription->notificationsPerPublish = maxNotificationsPerPublish;
|
|
|
|
+ if(maxNotificationsPerPublish == 0 ||
|
|
|
|
+ maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
|
|
|
|
+ subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
|
|
subscription->priority = priority;
|
|
subscription->priority = priority;
|
|
Subscription_registerPublishJob(server, subscription);
|
|
Subscription_registerPublishJob(server, subscription);
|
|
}
|
|
}
|
|
@@ -44,12 +47,13 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- UA_Session_addSubscription(session, newSubscription);
|
|
|
|
|
|
+ UA_Session_addSubscription(session, newSubscription);
|
|
newSubscription->publishingEnabled = request->publishingEnabled;
|
|
newSubscription->publishingEnabled = request->publishingEnabled;
|
|
setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
|
|
setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
|
|
request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
|
|
request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
|
|
request->maxNotificationsPerPublish, request->priority);
|
|
request->maxNotificationsPerPublish, request->priority);
|
|
- newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* immediately send the first response */
|
|
|
|
|
|
+ /* immediately send the first response */
|
|
|
|
+ newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
|
|
response->revisedPublishingInterval = newSubscription->publishingInterval;
|
|
response->revisedPublishingInterval = newSubscription->publishingInterval;
|
|
response->revisedLifetimeCount = newSubscription->lifeTimeCount;
|
|
response->revisedLifetimeCount = newSubscription->lifeTimeCount;
|
|
response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
|
|
response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
|
|
@@ -75,30 +79,31 @@ void Service_ModifySubscription(UA_Server *server, UA_Session *session,
|
|
}
|
|
}
|
|
|
|
|
|
void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
|
|
void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
|
|
- const UA_SetPublishingModeRequest *request, UA_SetPublishingModeResponse *response) {
|
|
|
|
-
|
|
|
|
- if(request->subscriptionIdsSize <= 0) {
|
|
|
|
- response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- size_t size = request->subscriptionIdsSize;
|
|
|
|
- response->results = UA_Array_new(size, &UA_TYPES[UA_TYPES_STATUSCODE]);
|
|
|
|
- if(!response->results) {
|
|
|
|
- response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- response->resultsSize = size;
|
|
|
|
- for(size_t i = 0; i < size; i++) {
|
|
|
|
- UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionIds[i]);
|
|
|
|
- if(!sub) {
|
|
|
|
- response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- sub->publishingEnabled = request->publishingEnabled;
|
|
|
|
|
|
+ const UA_SetPublishingModeRequest *request,
|
|
|
|
+ UA_SetPublishingModeResponse *response) {
|
|
|
|
+
|
|
|
|
+ if(request->subscriptionIdsSize <= 0) {
|
|
|
|
+ response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ size_t size = request->subscriptionIdsSize;
|
|
|
|
+ response->results = UA_Array_new(size, &UA_TYPES[UA_TYPES_STATUSCODE]);
|
|
|
|
+ if(!response->results) {
|
|
|
|
+ response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ response->resultsSize = size;
|
|
|
|
+ for(size_t i = 0; i < size; i++) {
|
|
|
|
+ UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionIds[i]);
|
|
|
|
+ if(!sub) {
|
|
|
|
+ response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ sub->publishingEnabled = request->publishingEnabled;
|
|
sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
|
|
sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
|
|
- }
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
static void
|
|
@@ -109,12 +114,12 @@ setMonitoredItemSettings(UA_Server *server, UA_MonitoredItem *mon,
|
|
MonitoredItem_unregisterSampleJob(server, mon);
|
|
MonitoredItem_unregisterSampleJob(server, mon);
|
|
mon->monitoringMode = monitoringMode;
|
|
mon->monitoringMode = monitoringMode;
|
|
mon->clientHandle = clientHandle;
|
|
mon->clientHandle = clientHandle;
|
|
- mon->samplingInterval = samplingInterval;
|
|
|
|
- UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
|
|
|
|
- samplingInterval, mon->samplingInterval);
|
|
|
|
- /* Check for nan */
|
|
|
|
- if(samplingInterval != samplingInterval)
|
|
|
|
- mon->samplingInterval = server->config.samplingIntervalLimits.min;
|
|
|
|
|
|
+ mon->samplingInterval = samplingInterval;
|
|
|
|
+ UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
|
|
|
|
+ samplingInterval, mon->samplingInterval);
|
|
|
|
+ /* Check for nan */
|
|
|
|
+ if(samplingInterval != samplingInterval)
|
|
|
|
+ mon->samplingInterval = server->config.samplingIntervalLimits.min;
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
|
|
UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
|
|
queueSize, mon->maxQueueSize);
|
|
queueSize, mon->maxQueueSize);
|
|
mon->discardOldest = discardOldest;
|
|
mon->discardOldest = discardOldest;
|
|
@@ -136,7 +141,7 @@ Service_CreateMonitoredItems_single(UA_Server *server, UA_Session *session, UA_S
|
|
// TODO: Check if the target node type has the requested attribute
|
|
// TODO: Check if the target node type has the requested attribute
|
|
|
|
|
|
/* Check if the encoding is supported */
|
|
/* Check if the encoding is supported */
|
|
- if(request->itemToMonitor.dataEncoding.name.length > 0 &&
|
|
|
|
|
|
+ if(request->itemToMonitor.dataEncoding.name.length > 0 &&
|
|
!UA_String_equal(&binaryEncoding, &request->itemToMonitor.dataEncoding.name)) {
|
|
!UA_String_equal(&binaryEncoding, &request->itemToMonitor.dataEncoding.name)) {
|
|
result->statusCode = UA_STATUSCODE_BADDATAENCODINGUNSUPPORTED;
|
|
result->statusCode = UA_STATUSCODE_BADDATAENCODINGUNSUPPORTED;
|
|
return;
|
|
return;
|
|
@@ -184,7 +189,6 @@ Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
|
|
|
|
|
|
/* Reset the subscription lifetime */
|
|
/* Reset the subscription lifetime */
|
|
sub->currentLifetimeCount = 0;
|
|
sub->currentLifetimeCount = 0;
|
|
-
|
|
|
|
if(request->itemsToCreateSize <= 0) {
|
|
if(request->itemsToCreateSize <= 0) {
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
return;
|
|
return;
|
|
@@ -233,7 +237,6 @@ void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
|
|
|
|
|
|
/* Reset the subscription lifetime */
|
|
/* Reset the subscription lifetime */
|
|
sub->currentLifetimeCount = 0;
|
|
sub->currentLifetimeCount = 0;
|
|
-
|
|
|
|
if(request->itemsToModifySize <= 0) {
|
|
if(request->itemsToModifySize <= 0) {
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
|
|
return;
|
|
return;
|
|
@@ -255,24 +258,24 @@ void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
|
|
|
|
|
|
void
|
|
void
|
|
Service_Publish(UA_Server *server, UA_Session *session,
|
|
Service_Publish(UA_Server *server, UA_Session *session,
|
|
- const UA_PublishRequest *request, UA_UInt32 requestId) {
|
|
|
|
- /* Return an error if the session has no subscription */
|
|
|
|
- if(LIST_EMPTY(&session->serverSubscriptions)) {
|
|
|
|
- UA_PublishResponse response;
|
|
|
|
- UA_PublishResponse_init(&response);
|
|
|
|
- response.responseHeader.requestHandle = request->requestHeader.requestHandle;
|
|
|
|
- response.responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
|
|
|
|
- UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
|
|
|
|
- &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // todo error handling for malloc
|
|
|
|
|
|
+ const UA_PublishRequest *request, UA_UInt32 requestId) {
|
|
|
|
+ /* Return an error if the session has no subscription */
|
|
|
|
+ if(LIST_EMPTY(&session->serverSubscriptions)) {
|
|
|
|
+ UA_PublishResponse response;
|
|
|
|
+ UA_PublishResponse_init(&response);
|
|
|
|
+ response.responseHeader.requestHandle = request->requestHeader.requestHandle;
|
|
|
|
+ response.responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
|
|
|
|
+ UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
|
|
|
|
+ &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // todo error handling for malloc
|
|
UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
|
|
UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
|
|
- entry->requestId = requestId;
|
|
|
|
|
|
+ entry->requestId = requestId;
|
|
UA_PublishResponse *response = &entry->response;
|
|
UA_PublishResponse *response = &entry->response;
|
|
- UA_PublishResponse_init(response);
|
|
|
|
- response->responseHeader.requestHandle = request->requestHeader.requestHandle;
|
|
|
|
|
|
+ UA_PublishResponse_init(response);
|
|
|
|
+ response->responseHeader.requestHandle = request->requestHeader.requestHandle;
|
|
|
|
|
|
/* Delete Acknowledged Subscription Messages */
|
|
/* Delete Acknowledged Subscription Messages */
|
|
response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
|
|
response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
|
|
@@ -335,7 +338,7 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
|
|
|
|
|
|
for(size_t i = 0; i < request->subscriptionIdsSize; i++)
|
|
for(size_t i = 0; i < request->subscriptionIdsSize; i++)
|
|
response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
|
|
response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
|
|
-}
|
|
|
|
|
|
+}
|
|
|
|
|
|
void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
|
|
void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
|
|
const UA_DeleteMonitoredItemsRequest *request,
|
|
const UA_DeleteMonitoredItemsRequest *request,
|
|
@@ -348,7 +351,6 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
|
|
|
|
|
|
/* Reset the subscription lifetime */
|
|
/* Reset the subscription lifetime */
|
|
sub->currentLifetimeCount = 0;
|
|
sub->currentLifetimeCount = 0;
|
|
-
|
|
|
|
response->results = UA_malloc(sizeof(UA_StatusCode) * request->monitoredItemIdsSize);
|
|
response->results = UA_malloc(sizeof(UA_StatusCode) * request->monitoredItemIdsSize);
|
|
if(!response->results) {
|
|
if(!response->results) {
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
@@ -371,7 +373,7 @@ void Service_Republish(UA_Server *server, UA_Session *session, const UA_Republis
|
|
|
|
|
|
/* Reset the subscription lifetime */
|
|
/* Reset the subscription lifetime */
|
|
sub->currentLifetimeCount = 0;
|
|
sub->currentLifetimeCount = 0;
|
|
-
|
|
|
|
|
|
+
|
|
/* Find the notification in the retransmission queue */
|
|
/* Find the notification in the retransmission queue */
|
|
UA_NotificationMessageEntry *entry;
|
|
UA_NotificationMessageEntry *entry;
|
|
LIST_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
|
|
LIST_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
|
|
@@ -384,3 +386,5 @@ void Service_Republish(UA_Server *server, UA_Session *session, const UA_Republis
|
|
else
|
|
else
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+#endif /* UA_ENABLE_SUBSCRIPTIONS */
|