Browse Source

the subscription manager only manages a list of subscriptions -> move to session

Julius Pfrommer 9 years ago
parent
commit
04c0cff8fb

+ 1 - 3
CMakeLists.txt

@@ -243,12 +243,10 @@ set(generate_subscriptiontypes "")
 if(UA_ENABLE_SUBSCRIPTIONS)
   list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/server/ua_services_subscription.c
                           ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
-                          ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_manager.c
                           ${PROJECT_SOURCE_DIR}/src/client/ua_client_highlevel_subscriptions.c)
   #append subscription headers at before ua_session
   list(FIND internal_headers "${PROJECT_SOURCE_DIR}/src/ua_session.h" UaSessionPos)
-  list(INSERT internal_headers  ${UaSessionPos} ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.h
-                                                ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_manager.h)
+  list(INSERT internal_headers  ${UaSessionPos} ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.h)
   set(generate_subscriptiontypes "--enable-subscription-types=1")
 endif()
 

+ 17 - 27
src/server/ua_services_subscription.c

@@ -1,6 +1,5 @@
 #include "ua_server_internal.h"
 #include "ua_services.h"
-#include "ua_subscription_manager.h"
 #include "ua_subscription.h"
 
 #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
@@ -12,7 +11,7 @@
 void Service_CreateSubscription(UA_Server *server, UA_Session *session,
                                 const UA_CreateSubscriptionRequest *request,
                                 UA_CreateSubscriptionResponse *response) {
-    response->subscriptionId = SubscriptionManager_getUniqueUIntID(&session->subscriptionManager);
+    response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
     UA_Subscription *newSubscription = UA_Subscription_new(response->subscriptionId);
     if(!newSubscription) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
@@ -47,7 +46,7 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     
     /* add the update job */
     Subscription_registerUpdateJob(server, newSubscription);
-    SubscriptionManager_addSubscription(&session->subscriptionManager, newSubscription);    
+    UA_Session_addSubscription(session, newSubscription);    
 }
 
 static void
@@ -72,7 +71,7 @@ createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *su
         return;
     }
 
-    newMon->itemId = ++(session->subscriptionManager.lastSessionID);
+    newMon->itemId = UA_Session_getUniqueSubscriptionID(session);
     result->monitoredItemId = newMon->itemId;
     newMon->clientHandle = request->requestedParameters.clientHandle;
 
@@ -102,8 +101,7 @@ createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *su
 void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_CreateMonitoredItemsRequest *request,
                                   UA_CreateMonitoredItemsResponse *response) {
-    UA_Subscription  *sub = SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
-                                                                    request->subscriptionId);
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if(!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
@@ -126,12 +124,8 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
 }
 
 void
-Service_Publish(UA_Server *server, UA_Session *session,
-                const UA_PublishRequest *request, UA_UInt32 requestId) {
-    UA_SubscriptionManager *manager= &session->subscriptionManager;
-    if(!manager)
-        return;
-
+Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
+                UA_UInt32 requestId) {
     UA_PublishResponse response;
     UA_PublishResponse_init(&response);
     response.responseHeader.requestHandle = request->requestHeader.requestHandle;
@@ -143,7 +137,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
     for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
         response.results[i] = UA_STATUSCODE_GOOD;
         UA_UInt32 sid = request->subscriptionAcknowledgements[i].subscriptionId;
-        UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, sid);
+        UA_Subscription *sub = UA_Session_getSubscriptionByID(session, sid);
         if(!sub) {
             response.results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             continue;
@@ -157,7 +151,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
 
     // See if any new data is available
     UA_Subscription *sub;
-    LIST_FOREACH(sub, &manager->serverSubscriptions, listEntry) {
+    LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) {
         if(sub->timedUpdateIsRegistered == false) {
             // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
             // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
@@ -205,7 +199,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
         // request, but currently we need to return something to the client. If no
         // subscriptions have notifications, force one to generate a keepalive so we
         // don't return an empty message
-        sub = LIST_FIRST(&manager->serverSubscriptions);
+        sub = LIST_FIRST(&session->serverSubscriptions);
         if(sub) {
             response.subscriptionId = sub->subscriptionID;
             sub->keepAliveCount.current=sub->keepAliveCount.min;
@@ -226,8 +220,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
 void
 Service_ModifySubscription(UA_Server *server, UA_Session *session, const UA_ModifySubscriptionRequest *request,
                            UA_ModifySubscriptionResponse *response) {
-    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
-                                                                   request->subscriptionId);
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if(!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
@@ -271,15 +264,13 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
 
     for(size_t i = 0; i < request->subscriptionIdsSize; i++)
         response->results[i] =
-            SubscriptionManager_deleteSubscription(server, &session->subscriptionManager,
-                                                   request->subscriptionIds[i]);
+            UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
 } 
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_DeleteMonitoredItemsRequest *request,
                                   UA_DeleteMonitoredItemsResponse *response) {
-    UA_SubscriptionManager *manager = &session->subscriptionManager;
-    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if(!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
@@ -294,14 +285,13 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
 
     for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
         response->results[i] =
-            SubscriptionManager_deleteMonitoredItem(manager, sub->subscriptionID,
-                                                    request->monitoredItemIds[i]);
+            UA_Session_deleteMonitoredItem(session, sub->subscriptionID,
+                                           request->monitoredItemIds[i]);
 }
 
-void Service_Republish(UA_Server *server, UA_Session *session,
-                       const UA_RepublishRequest *request, UA_RepublishResponse *response) {
-    UA_SubscriptionManager *manager = &session->subscriptionManager;
-    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
+void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
+                       UA_RepublishResponse *response) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if (!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;

+ 3 - 2
src/server/ua_subscription.h

@@ -5,6 +5,7 @@
 #include "ua_types.h"
 #include "ua_types_generated.h"
 #include "ua_nodes.h"
+#include "ua_session.h"
 
 /*****************/
 /* MonitoredItem */
@@ -59,7 +60,7 @@ typedef struct UA_unpublishedNotification {
     UA_NotificationMessage notification;
 } UA_unpublishedNotification;
 
-typedef struct UA_Subscription {
+struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
     UA_BoundedUInt32 lifeTime;
     UA_BoundedUInt32 keepAliveCount;
@@ -75,7 +76,7 @@ typedef struct UA_Subscription {
     LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification) unpublishedNotifications;
     size_t unpublishedNotificationsSize;
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) MonitoredItems;
-} UA_Subscription;
+};
 
 UA_Subscription *UA_Subscription_new(UA_UInt32 subscriptionID);
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);

+ 0 - 69
src/server/ua_subscription_manager.c

@@ -1,69 +0,0 @@
-#include "ua_types.h"
-#include "ua_server_internal.h"
-#include "ua_nodestore.h"
-#include "ua_subscription_manager.h"
-
-void SubscriptionManager_init(UA_Session *session) {
-    UA_SubscriptionManager *manager = &(session->subscriptionManager);
-    LIST_INIT(&manager->serverSubscriptions);
-    manager->lastSessionID = UA_UInt32_random();
-}
-
-void SubscriptionManager_deleteMembers(UA_Session *session, UA_Server *server) {
-    UA_SubscriptionManager *manager = &session->subscriptionManager;
-    UA_Subscription *current, *temp;
-    LIST_FOREACH_SAFE(current, &manager->serverSubscriptions, listEntry, temp) {
-        LIST_REMOVE(current, listEntry);
-        UA_Subscription_deleteMembers(current, server);
-        UA_free(current);
-    }
-}
-
-void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {
-    LIST_INSERT_HEAD(&manager->serverSubscriptions, newSubscription, listEntry);
-}
-
-UA_Subscription *
-SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_UInt32 subscriptionID) {
-    UA_Subscription *sub;
-    LIST_FOREACH(sub, &manager->serverSubscriptions, listEntry) {
-        if(sub->subscriptionID == subscriptionID)
-            break;
-    }
-    return sub;
-}
-
-UA_StatusCode
-SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_UInt32 subscriptionID,
-                                        UA_UInt32 monitoredItemID) {
-    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, subscriptionID);
-    if(!sub)
-        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-    
-    UA_MonitoredItem *mon, *tmp_mon;
-    LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
-        if(mon->itemId == monitoredItemID) {
-            LIST_REMOVE(mon, listEntry);
-            MonitoredItem_delete(mon);
-            return UA_STATUSCODE_GOOD;
-        }
-    }
-    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
-}
-
-UA_StatusCode
-SubscriptionManager_deleteSubscription(UA_Server *server, UA_SubscriptionManager *manager,
-                                       UA_UInt32 subscriptionID) {
-    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, subscriptionID);    
-    if(!sub)
-        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-    LIST_REMOVE(sub, listEntry);
-    UA_Subscription_deleteMembers(sub, server);
-    UA_free(sub);
-    return UA_STATUSCODE_GOOD;
-} 
-
-UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager) {
-    UA_UInt32 id = ++(manager->lastSessionID);
-    return id;
-}

+ 0 - 28
src/server/ua_subscription_manager.h

@@ -1,28 +0,0 @@
-#ifndef UA_SUBSCRIPTION_MANAGER_H_
-#define UA_SUBSCRIPTION_MANAGER_H_
-
-#include "ua_server.h"
-#include "ua_types.h"
-#include "queue.h"
-#include "ua_nodestore.h"
-#include "ua_subscription.h"
-
-typedef struct UA_SubscriptionManager {
-    UA_UInt32 lastSessionID;
-    LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
-} UA_SubscriptionManager;
-
-void SubscriptionManager_init(UA_Session *session);
-void SubscriptionManager_deleteMembers(UA_Session *session, UA_Server *server);
-void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
-UA_Subscription *
-SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_UInt32 subscriptionID);
-UA_StatusCode
-SubscriptionManager_deleteSubscription(UA_Server *server, UA_SubscriptionManager *manager,
-                                       UA_UInt32 subscriptionID);
-UA_StatusCode
-SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_UInt32 subscriptionID,
-                                        UA_UInt32 monitoredItemID);
-
-UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager);
-#endif /* UA_SUBSCRIPTION_MANAGER_H_ */

+ 62 - 2
src/ua_session.c

@@ -1,4 +1,5 @@
 #include "ua_session.h"
+#include "server/ua_subscription.h"
 #include "ua_util.h"
 
 UA_Session adminSession = {
@@ -27,7 +28,8 @@ void UA_Session_init(UA_Session *session) {
     UA_DateTime_init(&session->validTill);
     session->channel = NULL;
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-    SubscriptionManager_init(session);
+    LIST_INIT(&session->serverSubscriptions);
+    session->lastSubscriptionID = UA_UInt32_random();
 #endif
     session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
     LIST_INIT(&session->continuationPoints);
@@ -48,10 +50,68 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
     if(session->channel)
         UA_SecureChannel_detachSession(session->channel, session);
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-    SubscriptionManager_deleteMembers(session, server);
+    UA_Subscription *currents, *temps;
+    LIST_FOREACH_SAFE(currents, &session->serverSubscriptions, listEntry, temps) {
+        LIST_REMOVE(currents, listEntry);
+        UA_Subscription_deleteMembers(currents, server);
+        UA_free(currents);
+    }
 #endif
 }
 
 void UA_Session_updateLifetime(UA_Session *session) {
     session->validTill = UA_DateTime_now() + (UA_DateTime)(session->timeout * UA_MSEC_TO_DATETIME);
 }
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+
+void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription) {
+    LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
+}
+
+UA_StatusCode
+UA_Session_deleteSubscription(UA_Server *server, UA_Session *session, UA_UInt32 subscriptionID) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);    
+    if(!sub)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    LIST_REMOVE(sub, listEntry);
+    UA_Subscription_deleteMembers(sub, server);
+    UA_free(sub);
+    return UA_STATUSCODE_GOOD;
+} 
+
+UA_Subscription *
+UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) {
+        if(sub->subscriptionID == subscriptionID)
+            break;
+    }
+    return sub;
+}
+
+
+UA_StatusCode
+UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
+                               UA_UInt32 monitoredItemID) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
+    if(!sub)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    UA_MonitoredItem *mon, *tmp_mon;
+    LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
+        if(mon->itemId == monitoredItemID) {
+            LIST_REMOVE(mon, listEntry);
+            MonitoredItem_delete(mon);
+            return UA_STATUSCODE_GOOD;
+        }
+    }
+    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+}
+
+UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
+    return ++(session->lastSubscriptionID);
+}
+
+
+#endif

+ 29 - 16
src/ua_session.h

@@ -8,16 +8,6 @@
 
 #define MAXCONTINUATIONPOINTS 5
 
-#ifdef UA_ENABLE_SUBSCRIPTIONS
-#include "server/ua_subscription_manager.h"
-#endif
-
-/**
- *  @ingroup communication
- *
- * @{
- */
-
 struct ContinuationPointEntry {
     LIST_ENTRY(ContinuationPointEntry) pointers;
     UA_ByteString        identifier;
@@ -26,6 +16,9 @@ struct ContinuationPointEntry {
     UA_UInt32            maxReferences;
 };
 
+struct UA_Subscription;
+typedef struct UA_Subscription UA_Subscription;
+
 struct UA_Session {
     UA_ApplicationDescription clientDescription;
     UA_Boolean        activated;
@@ -36,22 +29,42 @@ struct UA_Session {
     UA_UInt32         maxResponseMessageSize;
     UA_Double         timeout; // [ms]
     UA_DateTime       validTill;
-    #ifdef UA_ENABLE_SUBSCRIPTIONS
-        UA_SubscriptionManager subscriptionManager;
-    #endif
     UA_SecureChannel *channel;
     UA_UInt16 availableContinuationPoints;
     LIST_HEAD(ContinuationPointList, ContinuationPointEntry) continuationPoints;
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+    UA_UInt32 lastSubscriptionID;
+    LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
+#endif
 };
 
-extern UA_Session adminSession; ///< Local access to the services (for startup and maintenance) uses this Session with all possible access rights (Session ID: 1)
+/* Local access to the services (for startup and maintenance) uses this Session
+ * with all possible access rights (Session ID: 1) */
+extern UA_Session adminSession;
 
 void UA_Session_init(UA_Session *session);
 void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server *server);
 
-/** If any activity on a session happens, the timeout is extended */
+/* If any activity on a session happens, the timeout is extended */
 void UA_Session_updateLifetime(UA_Session *session);
 
-/** @} */
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription);
+
+UA_Subscription *
+UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
+
+UA_StatusCode
+UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
+                               UA_UInt32 monitoredItemID);
+
+UA_StatusCode
+UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
+                              UA_UInt32 subscriptionID);
+
+UA_UInt32
+UA_Session_getUniqueSubscriptionID(UA_Session *session);
+#endif
+
 
 #endif /* UA_SESSION_H_ */