Browse Source

Added resource limitations for subscriptions and monitored items

Added support for:
Max number of subscriptions per seession
Max number of publish request per session
Max number of monitered Items per subscription

0 means limited by heap (old behaviour)
Mattias Bornhager 7 years ago
parent
commit
fe498381f7

+ 5 - 0
include/ua_server_config.h

@@ -75,6 +75,7 @@ struct UA_ServerConfig {
     UA_Double maxSessionTimeout; /* in ms */
 
     /* Limits for Subscriptions */
+    UA_UInt32 maxSubscriptionsPerSession;
     UA_DurationRange publishingIntervalLimits;
     UA_UInt32Range lifeTimeCountLimits;
     UA_UInt32Range keepAliveCountLimits;
@@ -82,9 +83,13 @@ struct UA_ServerConfig {
     UA_UInt32 maxRetransmissionQueueSize; /* 0 -> unlimited size */
 
     /* Limits for MonitoredItems */
+    UA_UInt32 maxMonitoredItemsPerSubscription;
     UA_DurationRange samplingIntervalLimits;
     UA_UInt32Range queueSizeLimits; /* Negotiated with the client */
 
+    /* Limits for PublishRequests */
+    UA_UInt32 maxPublishReqPerSession;
+
     /* Discovery */
 #ifdef UA_ENABLE_DISCOVERY
     /* Timeout in seconds when to automatically remove a registered server from

+ 24 - 2
src/server/ua_services_subscription.c

@@ -57,6 +57,12 @@ void
 Service_CreateSubscription(UA_Server *server, UA_Session *session,
                            const UA_CreateSubscriptionRequest *request,
                            UA_CreateSubscriptionResponse *response) {
+
+    if((server->config.maxSubscriptionsPerSession != 0) &&
+       (UA_Session_getNumSubscriptions(session) >= server->config.maxSubscriptionsPerSession)) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYOPERATIONS;
+        return;
+   }
     /* Create the subscription */
     UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
@@ -259,7 +265,8 @@ Operation_CreateMonitoredItem(UA_Server *server, UA_Session *session,
     newMon->timestampsToReturn = op_timestampsToReturn2;
     setMonitoredItemSettings(server, newMon, request->monitoringMode,
                              &request->requestedParameters);
-    LIST_INSERT_HEAD(&op_sub->monitoredItems, newMon, listEntry);
+
+    UA_Subscription_addMonitoredItem(op_sub, newMon);
 
     /* Create the first sample */
     if(request->monitoringMode == UA_MONITORINGMODE_REPORTING)
@@ -293,6 +300,13 @@ Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
         return;
     }
 
+    if((server->config.maxMonitoredItemsPerSubscription != 0) &&
+       ((UA_Subscription_getNumMonitoredItems(op_sub) + request->itemsToCreateSize) >
+        server->config.maxMonitoredItemsPerSubscription)) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYMONITOREDITEMS;
+        return;
+    }
+
     /* Reset the subscription lifetime */
     op_sub->currentLifetimeCount = 0;
 
@@ -424,6 +438,14 @@ Service_Publish(UA_Server *server, UA_Session *session,
         return;
     }
 
+    if((server->config.maxPublishReqPerSession != 0 ) &&
+       (UA_Session_getNumPublishReq(session) >= server->config.maxPublishReqPerSession)){
+        subscriptionSendError(session->channel, requestId,
+                              request->requestHeader.requestHandle,
+                              UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS);
+        return;
+    }
+
     UA_PublishResponseEntry *entry =
         (UA_PublishResponseEntry*)UA_malloc(sizeof(UA_PublishResponseEntry));
     if(!entry) {
@@ -469,7 +491,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
     }
 
     /* Queue the publish response */
-    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    UA_Session_addPublishReq(session, entry);
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Queued a publication message");
 
     /* Answer immediately to a late subscription */

+ 17 - 4
src/server/ua_subscription.c

@@ -18,6 +18,7 @@ UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
     /* Remaining members are covered by calloc zeroing out the memory */
     newItem->session = session;
     newItem->subscriptionID = subscriptionID;
+    newItem->numMonitoredItems = 0;
     newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
     TAILQ_INIT(&newItem->retransmissionQueue);
     return newItem;
@@ -72,9 +73,21 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
     /* Remove the MonitoredItem */
     LIST_REMOVE(mon, listEntry);
     MonitoredItem_delete(server, mon);
+    sub->numMonitoredItems--;
     return UA_STATUSCODE_GOOD;
 }
 
+void
+UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon) {
+    sub->numMonitoredItems++;
+    LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
+}
+
+UA_UInt32
+UA_Subscription_getNumMonitoredItems(UA_Subscription *sub) {
+    return sub->numMonitoredItems;
+}
+
 static size_t
 countQueuedNotifications(UA_Subscription *sub,
                          UA_Boolean *moreNotifications) {
@@ -250,7 +263,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         return;
 
     /* Dequeue a response */
-    UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
 
     /* Cannot publish without a response */
     if(!pre) {
@@ -302,7 +315,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* <-- The point of no return --> */
 
     /* Remove the response from the response queue */
-    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
+    UA_Session_removePublishReq(sub->session, pre);
 
     /* Set up the response */
     response->responseHeader.timestamp = UA_DateTime_now();
@@ -417,8 +430,8 @@ UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
 
     /* Send a response for every queued request */
     UA_PublishResponseEntry *pre;
-    while((pre = SIMPLEQ_FIRST(&session->responseQueue))) {
-        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+    while((pre = UA_Session_getPublishReq(session))) {
+        UA_Session_removePublishReq(session, pre);
         UA_PublishResponse *response = &pre->response;
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
         response->responseHeader.timestamp = UA_DateTime_now();

+ 7 - 1
src/server/ua_subscription.h

@@ -102,7 +102,7 @@ struct UA_Subscription {
     UA_UInt32 currentKeepAliveCount;
     UA_UInt32 currentLifetimeCount;
     UA_UInt32 lastMonitoredItemId;
-
+    UA_UInt32 numMonitoredItems;
     /* Publish Callback */
     UA_UInt64 publishCallbackId;
     UA_Boolean publishCallbackIsRegistered;
@@ -128,6 +128,12 @@ UA_StatusCode
 UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
                                     UA_UInt32 monitoredItemID);
 
+void
+UA_Subscription_addMonitoredItem(UA_Subscription *sub,
+                                 UA_MonitoredItem *newMon);
+UA_UInt32
+UA_Subscription_getNumMonitoredItems(UA_Subscription *sub);
+
 UA_MonitoredItem *
 UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID);
 

+ 47 - 2
src/ua_session.c

@@ -32,6 +32,8 @@ UA_Session adminSession = {
     0, /* .lastSubscriptionID */
     {NULL}, /* .serverSubscriptions */
     {NULL, NULL}, /* .responseQueue */
+    0, /* numSubscriptions */
+    0  /* numPublishReq */
 #endif
 };
 
@@ -53,6 +55,8 @@ void UA_Session_init(UA_Session *session) {
     LIST_INIT(&session->serverSubscriptions);
     session->lastSubscriptionID = 0;
     SIMPLEQ_INIT(&session->responseQueue);
+    session->numSubscriptions = 0;
+    session->numPublishReq = 0;
 #endif
 }
 
@@ -79,8 +83,8 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
         UA_free(currents);
     }
     UA_PublishResponseEntry *entry;
-    while((entry = SIMPLEQ_FIRST(&session->responseQueue))) {
-        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+    while((entry = UA_Session_getPublishReq(session))) {
+        UA_Session_removePublishReq(session,entry);
         UA_PublishResponse_deleteMembers(&entry->response);
         UA_free(entry);
     }
@@ -95,6 +99,7 @@ void UA_Session_updateLifetime(UA_Session *session) {
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 
 void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription) {
+    session->numSubscriptions++;
     LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
 }
 
@@ -104,12 +109,25 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+
     LIST_REMOVE(sub, listEntry);
     UA_Subscription_deleteMembers(sub, server);
     UA_free(sub);
+    if(session->numSubscriptions > 0) {
+        session->numSubscriptions--;
+    }
+    else {
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+
     return UA_STATUSCODE_GOOD;
 }
 
+UA_UInt32
+UA_Session_getNumSubscriptions( UA_Session *session ) {
+   return session->numSubscriptions;
+}
+
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
     UA_Subscription *sub;
@@ -124,4 +142,31 @@ UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
     return ++(session->lastSubscriptionID);
 }
 
+UA_UInt32
+UA_Session_getNumPublishReq(UA_Session *session) {
+    return session->numPublishReq;
+}
+
+UA_PublishResponseEntry*
+UA_Session_getPublishReq(UA_Session *session) {
+    return SIMPLEQ_FIRST(&session->responseQueue);
+}
+
+void
+UA_Session_removePublishReq( UA_Session *session, UA_PublishResponseEntry* entry){
+    UA_PublishResponseEntry* firstEntry;
+    firstEntry = SIMPLEQ_FIRST(&session->responseQueue);
+
+    /* Remove the response from the response queue */
+    if((firstEntry != 0) && (firstEntry == entry)) {
+        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+        session->numPublishReq--;
+    }
+}
+
+void UA_Session_addPublishReq( UA_Session *session, UA_PublishResponseEntry* entry){
+    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    session->numPublishReq++;
+}
+
 #endif

+ 18 - 0
src/ua_session.h

@@ -58,6 +58,8 @@ struct UA_Session {
     UA_UInt32 lastSubscriptionID;
     LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
     SIMPLEQ_HEAD(UA_ListOfQueuedPublishResponses, UA_PublishResponseEntry) responseQueue;
+    UA_UInt32        numSubscriptions;
+    UA_UInt32        numPublishReq;
 #endif
 };
 
@@ -74,6 +76,9 @@ void UA_Session_updateLifetime(UA_Session *session);
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription);
 
+UA_UInt32
+UA_Session_getNumSubscriptions( UA_Session *session );
+
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
@@ -83,6 +88,19 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
 
 UA_UInt32
 UA_Session_getUniqueSubscriptionID(UA_Session *session);
+
+UA_UInt32
+UA_Session_getNumPublishReq(UA_Session *session);
+
+UA_PublishResponseEntry*
+UA_Session_getPublishReq(UA_Session *session);
+
+void
+UA_Session_removePublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
+
+void
+UA_Session_addPublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
+
 #endif
 
 /**