Browse Source

Basic creation scaffolding for subscriptions added.

ichrispa 10 years ago
parent
commit
2b57246f47

+ 7 - 0
src/server/ua_server.c

@@ -6,6 +6,9 @@
 #include "ua_services.h"
 #include "ua_nodeids.h"
 
+#ifdef ENABLESUBSCRIPTIONS
+#include "ua_subscription_manager.h"
+#endif
 
 const UA_ServerConfig UA_ServerConfig_standard = {
         UA_TRUE,
@@ -304,6 +307,10 @@ UA_Server * UA_Server_new(UA_ServerConfig config) {
 
     // random seed
     server->random_seed = (UA_UInt32)UA_DateTime_now();
+    
+#ifdef ENABLESUBSCRIPTIONS
+    SubscriptionManager_init(server);
+#endif
 
     // networklayers
     server->networkLayers = UA_NULL;

+ 7 - 1
src/server/ua_server_binary.c

@@ -295,7 +295,7 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
         UA_ActivateSessionResponse_deleteMembers(&r);
         break;
     }
-
+    
     case UA_NS0ID_CLOSESESSIONREQUEST:
         INVOKE_SERVICE(CloseSession);
         break;
@@ -324,6 +324,12 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
         INVOKE_SERVICE(TranslateBrowsePathsToNodeIds);
         break;
 
+#ifdef ENABLESUBSCRIPTIONS    
+    case UA_NS0ID_CREATESUBSCRIPTIONREQUEST:
+        INVOKE_SERVICE(CreateSubscription);
+        break;
+#endif
+
     default: {
         UA_LOG_INFO(server->logger, UA_LOGGERCATEGORY_COMMUNICATION, "Unknown request: NodeId(ns=%d, i=%d)",
                     requestType.namespaceIndex, requestType.identifier.numeric);

+ 9 - 0
src/server/ua_server_internal.h

@@ -7,6 +7,10 @@
 #include "ua_securechannel_manager.h"
 #include "ua_nodestore.h"
 
+#ifdef ENABLESUBSCRIPTIONS
+#include "ua_subscription_manager.h"
+#endif
+
 #define PRODUCT_URI "http://open62541.org"
 #define ANONYMOUS_POLICY "open62541-anonymous-policy"
 #define USERNAME_POLICY "open62541-username-policy"
@@ -46,6 +50,11 @@ struct UA_Server {
     UA_SecureChannelManager secureChannelManager;
     UA_SessionManager sessionManager;
 
+    /* Subscriptions and Monitoring */
+#ifdef ENABLESUBSCRIPTIONS
+    UA_SubscriptionManager subscriptionManager;
+#endif
+    
     /* Address Space */
     UA_NodeStore *nodestore;
     size_t namespacesSize;

+ 2 - 2
src/server/ua_server_worker.c

@@ -225,7 +225,7 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
 #ifdef UA_MULTITHREADING
         if(tw->repetitionInterval > 0) {
             // copy the entry and insert at the new location
-            UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
+            UA_WorkItem *workCopy = (UA_WorkItem *) UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
             UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
             dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
             tw->time += tw->repetitionInterval;
@@ -249,7 +249,7 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
         }
 #else
         // 1) Process the work since it is past its due date
-        processWork(server, tw->work, tw->workSize); // does not free the work
+        processWork(server, (UA_WorkItem *) tw->work, tw->workSize); // does not free the work
 
         // 2) If the work is repeated, add it back into the list. Otherwise remove it.
         if(tw->interval > 0) {

+ 15 - 13
src/server/ua_services.h

@@ -246,6 +246,7 @@ void Service_Write(UA_Server *server, UA_Session *session, const UA_WriteRequest
 // Service_DeleteMonitoredItems
 /** @} */
 
+#ifdef ENABLESUBSCRIPTIONS
 /**
  * @name Subscription Service Set
  *
@@ -253,24 +254,25 @@ void Service_Write(UA_Server *server, UA_Session *session, const UA_WriteRequest
  *
  * @{
  */
-// Service_CreateSubscription
-/* UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session, */
-/*                                     const UA_CreateSubscriptionRequest *request, */
-/*                                     UA_CreateSubscriptionResponse *response); */
-// Service_ModifySubscription
-// Service_SetPublishingMode
-/* UA_Int32 Service_SetPublishingMode(UA_Server *server, UA_Session *session, */
-/*                                    const UA_SetPublishingModeRequest *request, */
-/*                                    UA_SetPublishingModeResponse *response); */
-
-/* UA_Int32 Service_Publish(UA_Server *server, UA_Session *session, */
-/*                          const UA_PublishRequest *request, */
-/*                          UA_PublishResponse *response); */
+    
+UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                                    const UA_CreateSubscriptionRequest *request,
+                                    UA_CreateSubscriptionResponse *response);
+//~ Service_ModifySubscription
+//~ Service_SetPublishingMode
+//~ UA_Int32 Service_SetPublishingMode(UA_Server *server, UA_Session *session,
+                                    //~ const UA_SetPublishingModeRequest *request,
+                                    //~ UA_SetPublishingModeResponse *response);
+//~ 
+//~ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
+                         //~ const UA_PublishRequest *request,
+                         //~ UA_PublishResponse *response);
 
 // Service_Republish
 // Service_TransferSubscription
 // Service_DeleteSubscription
 /** @} */
 /** @} */
+#endif
 
 #endif /* UA_SERVICES_H_ */

+ 38 - 0
src/server/ua_services_subscription.c

@@ -0,0 +1,38 @@
+#ifdef ENABLESUBSCRIPTIONS
+#include "ua_services.h"
+#include "ua_server_internal.h"
+#include "ua_subscription_manager.h"
+#include "ua_statuscodes.h"
+#include "ua_util.h"
+
+// Remove later, debugging only
+#include "ua_log.h"
+
+UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                                     const UA_CreateSubscriptionRequest *request,
+                                     UA_CreateSubscriptionResponse *response) {
+    UA_Subscription *newSubscription;
+    response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
+    
+    response->subscriptionId = ++(server->subscriptionManager.LastSessionID);
+    if      (request->requestedPublishingInterval > server->subscriptionManager.GlobalPublishingInterval.maxValue) response->revisedPublishingInterval = server->subscriptionManager.GlobalPublishingInterval.maxValue ;
+    else if (request->requestedPublishingInterval < server->subscriptionManager.GlobalPublishingInterval.minValue) response->revisedPublishingInterval = server->subscriptionManager.GlobalPublishingInterval.minValue ;
+    else response->revisedPublishingInterval = request->requestedPublishingInterval ;
+    
+    if      (request->requestedLifetimeCount > server->subscriptionManager.GlobalLifeTimeCount.maxValue) response->revisedLifetimeCount = server->subscriptionManager.GlobalLifeTimeCount.maxValue ;
+    else if (request->requestedLifetimeCount < server->subscriptionManager.GlobalLifeTimeCount.minValue) response->revisedLifetimeCount = server->subscriptionManager.GlobalLifeTimeCount.minValue ;
+    else response->revisedLifetimeCount = request->requestedLifetimeCount ;
+    
+    if      (request->requestedMaxKeepAliveCount > server->subscriptionManager.GlobalKeepAliveCount.maxValue) response->revisedMaxKeepAliveCount = server->subscriptionManager.GlobalKeepAliveCount.maxValue ;
+    else if (request->requestedMaxKeepAliveCount < server->subscriptionManager.GlobalKeepAliveCount.minValue) response->revisedMaxKeepAliveCount = server->subscriptionManager.GlobalKeepAliveCount.minValue ;
+    else response->revisedMaxKeepAliveCount = request->requestedMaxKeepAliveCount ;
+    
+    //maxNotificationsPerPublish ?
+    //Type??
+    newSubscription = UA_Subscription_new(response->subscriptionId);
+    SubscriptionManager_addSubscription(&(server->subscriptionManager), newSubscription);    
+    
+    return (UA_Int32) 0;
+}
+
+#endif //#ifdef ENABLESUBSCRIPTIONS

+ 42 - 0
src/server/ua_subscription_manager.c

@@ -0,0 +1,42 @@
+#ifdef ENABLESUBSCRIPTIONS
+#include "ua_types.h"
+#include "ua_server_internal.h"
+#include "ua_subscription_manager.h"
+
+void SubscriptionManager_init(UA_Server *server) {
+    UA_SubscriptionManager *manager = &(server->subscriptionManager);
+
+    manager->LastSessionID = (UA_UInt32) (server->random_seed + (UA_UInt32)UA_DateTime_now());
+    /* FIXME: These init values are empirical. Maybe they should be part
+     *        of the server config? */
+    manager->GlobalPublishingInterval = (UA_Int32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
+    manager->GlobalLifeTimeCount      = (UA_UInt32_BoundedValue) { .maxValue = 15000, .minValue = 0, .currentValue=0 };
+    manager->GlobalKeepAliveCount     = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
+    manager->GlobalNotificationsPerPublish = (UA_Int32_BoundedValue) { .maxValue = 1000, .minValue = 1, .currentValue=0 };
+    
+    LIST_INIT(manager->ServerSubscriptions);
+    return;
+}
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
+    UA_Subscription *new = (UA_Subscription *) malloc(sizeof(UA_Subscription));
+    
+    LIST_INIT(new->MonitoredItems);
+    return new;
+}
+
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {
+    LIST_INSERT_HEAD(manager->ServerSubscriptions, newSubscription, listEntry);
+    
+    return;
+}
+
+void SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
+    return;
+}
+
+void SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription) {
+    return;
+}
+
+#endif //#ifdef ENABLESUBSCRIPTIONS

+ 56 - 0
src/server/ua_subscription_manager.h

@@ -0,0 +1,56 @@
+#ifdef ENABLESUBSCRIPTIONS
+#ifndef UA_SUBSCRIPTION_MANAGER_H_
+#define UA_SUBSCRIPTION_MANAGER_H_
+
+#include "ua_server.h"
+#include "ua_types.h"
+#include "queue.h"
+
+typedef struct {
+    UA_Int32 currentValue;
+    UA_Int32 minValue;
+    UA_Int32 maxValue;
+} UA_Int32_BoundedValue;
+
+typedef struct {
+    UA_UInt32 currentValue;
+    UA_UInt32 minValue;
+    UA_UInt32 maxValue;
+} UA_UInt32_BoundedValue;
+
+typedef struct UA_MonitoredItem_s {
+    UA_NodeId node;
+    LIST_ENTRY(UA_MonitoredItem_s) listEntry;
+} UA_MonitoredItem;
+
+
+typedef struct UA_Subscription_s {
+    UA_UInt32_BoundedValue LiveTime;
+    UA_UInt32_BoundedValue KeepAliveCount;
+    UA_Int32  PublishingInterval;
+    UA_Int32  SubscriptionID;
+    UA_Int32  NotificationsPerPublish;
+    UA_Byte   PublishingMode;
+    UA_UInt32 Priority;
+    LIST_ENTRY(UA_Subscription_s) listEntry;
+    LIST_HEAD(ListOfUAMonitoredItems, UA_MonitoredItem_s) *MonitoredItems;
+} UA_Subscription;
+
+
+typedef struct UA_SubscriptionManager_s {
+    UA_Int32_BoundedValue  GlobalPublishingInterval;
+    UA_UInt32_BoundedValue GlobalLifeTimeCount;
+    UA_UInt32_BoundedValue GlobalKeepAliveCount;
+    UA_Int32_BoundedValue  GlobalNotificationsPerPublish;
+    UA_Int32 LastSessionID;
+    LIST_HEAD(ListOfUASubscription, UA_Subscription_s) *ServerSubscriptions;
+} UA_SubscriptionManager;
+
+void SubscriptionManager_init(UA_Server *server);
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
+void SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
+void SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
+
+#endif  // ifndef... define UA_SUBSCRIPTION_MANAGER_H_
+#endif  // ifdef EnableSubscriptions ...