Переглянути джерело

Merge pull request #1340 from mattias777/feature/subscription_limit

Added resource limitations for subscriptions and monitored items
Stefan Profanter 7 роки тому
батько
коміт
0fa0a0c767

+ 5 - 0
include/ua_server_config.h

@@ -89,6 +89,7 @@ struct UA_ServerConfig {
     UA_UInt32 maxReferencesPerNode;
 
     /* Limits for Subscriptions */
+    UA_UInt32 maxSubscriptionsPerSession;
     UA_DurationRange publishingIntervalLimits;
     UA_UInt32Range lifeTimeCountLimits;
     UA_UInt32Range keepAliveCountLimits;
@@ -96,9 +97,13 @@ struct UA_ServerConfig {
     UA_UInt32 maxRetransmissionQueueSize; /* 0 -> unlimited size */
 
     /* Limits for MonitoredItems */
+    UA_UInt32 maxMonitoredItemsPerSubscription;
     UA_DurationRange samplingIntervalLimits;
     UA_UInt32Range queueSizeLimits; /* Negotiated with the client */
 
+    /* Limits for PublishRequests */
+    UA_UInt32 maxPublishReqPerSession;
+
     /* Discovery */
 #ifdef UA_ENABLE_DISCOVERY
     /* Timeout in seconds when to automatically remove a registered server from

+ 29 - 2
src/server/ua_services_subscription.c

@@ -57,6 +57,12 @@ void
 Service_CreateSubscription(UA_Server *server, UA_Session *session,
                            const UA_CreateSubscriptionRequest *request,
                            UA_CreateSubscriptionResponse *response) {
+
+    if((server->config.maxSubscriptionsPerSession != 0) &&
+       (UA_Session_getNumSubscriptions(session) >= server->config.maxSubscriptionsPerSession)) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
+        return;
+   }
     /* Create the subscription */
     UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
@@ -259,7 +265,8 @@ Operation_CreateMonitoredItem(UA_Server *server, UA_Session *session,
     newMon->timestampsToReturn = op_timestampsToReturn2;
     setMonitoredItemSettings(server, newMon, request->monitoringMode,
                              &request->requestedParameters);
-    LIST_INSERT_HEAD(&op_sub->monitoredItems, newMon, listEntry);
+
+    UA_Subscription_addMonitoredItem(op_sub, newMon);
 
     /* Create the first sample */
     if(request->monitoringMode == UA_MONITORINGMODE_REPORTING)
@@ -299,6 +306,13 @@ Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
         return;
     }
 
+    if((server->config.maxMonitoredItemsPerSubscription != 0) &&
+       ((UA_Subscription_getNumMonitoredItems(op_sub) + request->itemsToCreateSize) >
+        server->config.maxMonitoredItemsPerSubscription)) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYMONITOREDITEMS;
+        return;
+    }
+
     /* Reset the subscription lifetime */
     op_sub->currentLifetimeCount = 0;
 
@@ -442,6 +456,19 @@ Service_Publish(UA_Server *server, UA_Session *session,
         return;
     }
 
+    /* Handle too many subscriptions to free resources before trying to allocate
+     * resources for the new publish request. If the limit has been reached the
+     * oldest publish request shall be responded */
+    if((server->config.maxPublishReqPerSession != 0 ) &&
+       (UA_Session_getNumPublishReq(session) >= server->config.maxPublishReqPerSession)){
+        if(!UA_Subscription_reachedPublishReqLimit(server,session)) {
+            subscriptionSendError(session->channel, requestId,
+                                  request->requestHeader.requestHandle,
+                                  UA_STATUSCODE_BADINTERNALERROR);
+            return;
+        }
+    }
+
     UA_PublishResponseEntry *entry =
         (UA_PublishResponseEntry*)UA_malloc(sizeof(UA_PublishResponseEntry));
     if(!entry) {
@@ -487,7 +514,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
     }
 
     /* Queue the publish response */
-    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    UA_Session_addPublishReq(session, entry);
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Queued a publication message");
 
     /* Answer immediately to a late subscription */

+ 64 - 4
src/server/ua_subscription.c

@@ -18,6 +18,7 @@ UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
     /* Remaining members are covered by calloc zeroing out the memory */
     newItem->session = session;
     newItem->subscriptionID = subscriptionID;
+    newItem->numMonitoredItems = 0;
     newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
     TAILQ_INIT(&newItem->retransmissionQueue);
     return newItem;
@@ -72,9 +73,21 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
     /* Remove the MonitoredItem */
     LIST_REMOVE(mon, listEntry);
     MonitoredItem_delete(server, mon);
+    sub->numMonitoredItems--;
     return UA_STATUSCODE_GOOD;
 }
 
+void
+UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon) {
+    sub->numMonitoredItems++;
+    LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
+}
+
+UA_UInt32
+UA_Subscription_getNumMonitoredItems(UA_Subscription *sub) {
+    return sub->numMonitoredItems;
+}
+
 static size_t
 countQueuedNotifications(UA_Subscription *sub,
                          UA_Boolean *moreNotifications) {
@@ -250,7 +263,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         return;
 
     /* Dequeue a response */
-    UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
 
     /* Cannot publish without a response */
     if(!pre) {
@@ -302,7 +315,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* <-- The point of no return --> */
 
     /* Remove the response from the response queue */
-    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
+    UA_Session_removePublishReq(sub->session, pre);
 
     /* Set up the response */
     response->responseHeader.timestamp = UA_DateTime_now();
@@ -367,6 +380,53 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     }
 }
 
+UA_Boolean
+UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, session,
+                         "Reached number of publish request limit");
+
+
+    /* Dequeue a response */
+    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(session);
+
+    /* Cannot publish without a response */
+    if(!pre) {
+        UA_LOG_FATAL_SESSION(server->config.logger, session, "No publish requests available");
+        return false;
+    }
+
+    UA_PublishResponse *response = &pre->response;
+    UA_NotificationMessage *message = &response->notificationMessage;
+
+    /* <-- The point of no return --> */
+
+    /* Remove the response from the response queue */
+    UA_Session_removePublishReq(session, pre);
+
+    /* Set up the response. Note that this response has no related subscription id */
+    response->responseHeader.timestamp = UA_DateTime_now();
+    response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS;
+    response->subscriptionId = 0;
+    response->moreNotifications = false;
+    message->publishTime = response->responseHeader.timestamp;
+    message->sequenceNumber = 0;
+    response->availableSequenceNumbersSize = 0;
+
+    /* Send the response */
+    UA_LOG_DEBUG_SESSION(server->config.logger, session,
+                         "Sending out a publish response triggered by too many publish requests");
+    UA_SecureChannel_sendSymmetricMessage(session->channel, pre->requestId,
+                                          UA_MESSAGETYPE_MSG, response,
+                                          &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+
+    /* Free the response */
+    UA_Array_delete(response->results, response->resultsSize,
+                    &UA_TYPES[UA_TYPES_UINT32]);
+    UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
+
+    return true;
+}
+
 UA_StatusCode
 Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
@@ -417,8 +477,8 @@ UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
 
     /* Send a response for every queued request */
     UA_PublishResponseEntry *pre;
-    while((pre = SIMPLEQ_FIRST(&session->responseQueue))) {
-        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+    while((pre = UA_Session_getPublishReq(session))) {
+        UA_Session_removePublishReq(session, pre);
         UA_PublishResponse *response = &pre->response;
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
         response->responseHeader.timestamp = UA_DateTime_now();

+ 9 - 1
src/server/ua_subscription.h

@@ -102,7 +102,7 @@ struct UA_Subscription {
     UA_UInt32 currentKeepAliveCount;
     UA_UInt32 currentLifetimeCount;
     UA_UInt32 lastMonitoredItemId;
-
+    UA_UInt32 numMonitoredItems;
     /* Publish Callback */
     UA_UInt64 publishCallbackId;
     UA_Boolean publishCallbackIsRegistered;
@@ -128,6 +128,12 @@ UA_StatusCode
 UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
                                     UA_UInt32 monitoredItemID);
 
+void
+UA_Subscription_addMonitoredItem(UA_Subscription *sub,
+                                 UA_MonitoredItem *newMon);
+UA_UInt32
+UA_Subscription_getNumMonitoredItems(UA_Subscription *sub);
+
 UA_MonitoredItem *
 UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID);
 
@@ -139,4 +145,6 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequ
 void
 UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session);
 
+UA_Boolean
+UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session);
 #endif /* UA_SUBSCRIPTION_H_ */

+ 47 - 2
src/ua_session.c

@@ -33,6 +33,8 @@ UA_Session adminSession = {
     0, /* .lastSeenSubscriptionID */
     {NULL}, /* .serverSubscriptions */
     {NULL, NULL}, /* .responseQueue */
+    0, /* numSubscriptions */
+    0  /* numPublishReq */
 #endif
 };
 
@@ -55,6 +57,8 @@ void UA_Session_init(UA_Session *session) {
     session->lastSubscriptionID = 0;
     session->lastSeenSubscriptionID = 0;
     SIMPLEQ_INIT(&session->responseQueue);
+    session->numSubscriptions = 0;
+    session->numPublishReq = 0;
 #endif
 }
 
@@ -81,8 +85,8 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
         UA_free(currents);
     }
     UA_PublishResponseEntry *entry;
-    while((entry = SIMPLEQ_FIRST(&session->responseQueue))) {
-        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+    while((entry = UA_Session_getPublishReq(session))) {
+        UA_Session_removePublishReq(session,entry);
         UA_PublishResponse_deleteMembers(&entry->response);
         UA_free(entry);
     }
@@ -97,6 +101,7 @@ void UA_Session_updateLifetime(UA_Session *session) {
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 
 void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription) {
+    session->numSubscriptions++;
     LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
 }
 
@@ -106,12 +111,25 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+
     LIST_REMOVE(sub, listEntry);
     UA_Subscription_deleteMembers(sub, server);
     UA_free(sub);
+    if(session->numSubscriptions > 0) {
+        session->numSubscriptions--;
+    }
+    else {
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+
     return UA_STATUSCODE_GOOD;
 }
 
+UA_UInt32
+UA_Session_getNumSubscriptions( UA_Session *session ) {
+   return session->numSubscriptions;
+}
+
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
     UA_Subscription *sub;
@@ -126,4 +144,31 @@ UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
     return ++(session->lastSubscriptionID);
 }
 
+UA_UInt32
+UA_Session_getNumPublishReq(UA_Session *session) {
+    return session->numPublishReq;
+}
+
+UA_PublishResponseEntry*
+UA_Session_getPublishReq(UA_Session *session) {
+    return SIMPLEQ_FIRST(&session->responseQueue);
+}
+
+void
+UA_Session_removePublishReq( UA_Session *session, UA_PublishResponseEntry* entry){
+    UA_PublishResponseEntry* firstEntry;
+    firstEntry = SIMPLEQ_FIRST(&session->responseQueue);
+
+    /* Remove the response from the response queue */
+    if((firstEntry != 0) && (firstEntry == entry)) {
+        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+        session->numPublishReq--;
+    }
+}
+
+void UA_Session_addPublishReq( UA_Session *session, UA_PublishResponseEntry* entry){
+    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    session->numPublishReq++;
+}
+
 #endif

+ 18 - 0
src/ua_session.h

@@ -59,6 +59,8 @@ struct UA_Session {
     UA_UInt32 lastSeenSubscriptionID;
     LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
     SIMPLEQ_HEAD(UA_ListOfQueuedPublishResponses, UA_PublishResponseEntry) responseQueue;
+    UA_UInt32        numSubscriptions;
+    UA_UInt32        numPublishReq;
 #endif
 };
 
@@ -75,6 +77,9 @@ void UA_Session_updateLifetime(UA_Session *session);
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription);
 
+UA_UInt32
+UA_Session_getNumSubscriptions( UA_Session *session );
+
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
@@ -84,6 +89,19 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
 
 UA_UInt32
 UA_Session_getUniqueSubscriptionID(UA_Session *session);
+
+UA_UInt32
+UA_Session_getNumPublishReq(UA_Session *session);
+
+UA_PublishResponseEntry*
+UA_Session_getPublishReq(UA_Session *session);
+
+void
+UA_Session_removePublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
+
+void
+UA_Session_addPublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
+
 #endif
 
 /**