Browse Source

Basic infrastructure for createing subscriptions updated by a timed work item (WIP).

ichrispa 9 years ago
parent
commit
dc40af607e

+ 3 - 1
src/server/ua_services_subscription.c

@@ -18,7 +18,7 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     UA_Subscription *newSubscription;
     
     // Create Subscription and Response
-    response->subscriptionId = ++(session->subscriptionManager.LastSessionID);
+    response->subscriptionId = SubscriptionManager_getUniqueUIntID(&(session->subscriptionManager));
     newSubscription = UA_Subscription_new(response->subscriptionId);
     
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalPublishingInterval,
@@ -43,6 +43,8 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     newSubscription->PublishingMode          = request->publishingEnabled;
     newSubscription->Priority                = request->priority;
     
+    UA_Guid jobId = SubscriptionManager_getUniqueGUID(&(session->subscriptionManager));
+    Subscriptions_createdUpdateJob(server, jobId, newSubscription);
     SubscriptionManager_addSubscription(&(session->subscriptionManager), newSubscription);    
 }
 

+ 65 - 2
src/server/ua_subscription.c

@@ -13,11 +13,41 @@ UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
     new->SubscriptionID = SubscriptionID;
     new->LastPublished  = 0;
     new->SequenceNumber = 0;
+    memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
+    new->timedUpdateJob     = UA_NULL;
     LIST_INIT(&new->MonitoredItems);
     LIST_INIT(&new->unpublishedNotifications);
     return new;
 }
 
+void UA_Subscription_deleteMembers(UA_Subscription *subscription) {
+    UA_unpublishedNotification *not;
+    UA_MonitoredItem *mon;
+    
+    // Just in case any parallel process attempts to acces this subscription
+    // while we are deleting it... make it vanish.
+    subscription->SubscriptionID = 0;
+    
+    // Delete monitored Items
+    while(subscription->MonitoredItems.lh_first != NULL) {
+        mon = subscription->MonitoredItems.lh_first;
+        LIST_REMOVE(mon, listEntry);
+        MonitoredItem_delete(mon);
+    }
+    
+    // Delete unpublished Notifications
+    while(subscription->unpublishedNotifications.lh_first !=  NULL) {
+        not = subscription->unpublishedNotifications.lh_first;
+        LIST_REMOVE(not, listEntry);
+        Subscription_deleteUnpublishedNotification(not->notification->sequenceNumber, subscription);
+    }
+    
+    // Unhook/Unregister any timed work assiociated with this subscription
+    if (subscription->timedUpdateJob != UA_NULL)
+        UA_free(subscription->timedUpdateJob);
+    // TODO: Server_removeTimedWork(timedUpdateJob)
+}
+
 UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
     if (!subscription)
         return 0;
@@ -198,10 +228,11 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
 }
 
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
-    UA_unpublishedNotification *not;
+    UA_unpublishedNotification *not, *nxtnot = UA_NULL;
     UA_UInt32 deletedItems = 0;
   
-    for(not=sub->unpublishedNotifications.lh_first; not != NULL; not=not->listEntry.le_next) {
+    for(not=sub->unpublishedNotifications.lh_first; not != NULL; not=nxtnot) {
+        nxtnot = not->listEntry.le_next;
         if(not->notification->sequenceNumber != seqNo)
             continue;
         LIST_REMOVE(not, listEntry);
@@ -213,12 +244,44 @@ UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscri
             }
             UA_free(not->notification);
         }
+        
         UA_free(not);
         deletedItems++;
     }
     return deletedItems;
 }
 
+
+static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
+    // Timed-Worker/Job Version of updateNotifications
+    return;
+}
+
+
+UA_StatusCode Subscriptions_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
+    if (server == UA_NULL || sub == UA_NULL)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+    
+    UA_Job *theWork;
+    theWork = (UA_Job *) malloc(sizeof(UA_Job));
+    if (!theWork)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    
+   /*
+    *   UA_Job theWork = {.type = UA_JOBTYPE_METHODCALL, 
+    *                     .job.methodCall = { .method = Subscription_timedUpdateNotificationsJob, .data = sub } };
+    */
+   *theWork = (UA_Job) {  .type = UA_JOBTYPE_METHODCALL,
+                          .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = NULL} };
+   
+   sub->timedUpdateJobGuid = jobId;
+   sub->timedUpdateJob     = theWork;
+   
+   //UA_Server_addRepeatedJob(server, );
+   
+   return UA_STATUSCODE_GOOD;
+}
+
 /*****************/
 /* MonitoredItem */
 /*****************/

+ 5 - 0
src/server/ua_subscription.h

@@ -83,17 +83,22 @@ typedef struct UA_Subscription_s {
     UA_Boolean                          PublishingMode;
     UA_UInt32                           Priority;
     UA_UInt32                           SequenceNumber;
+    UA_Guid                             timedUpdateJobGuid;
+    UA_Job                              *timedUpdateJob;
     LIST_ENTRY(UA_Subscription_s)       listEntry;
     LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification_s) unpublishedNotifications;
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem_s) MonitoredItems;
 } UA_Subscription;
 
 UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
+void UA_Subscription_deleteMembers(UA_Subscription *subscription);
 void Subscription_updateNotifications(UA_Subscription *subscription);
 UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
 UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
 void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
 void Subscription_generateKeepAlive(UA_Subscription *subscription);
+UA_StatusCode Subscriptions_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
+static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data);
 
 #endif /* UA_SUBSCRIPTION_H_ */

+ 33 - 0
src/server/ua_subscription_manager.c

@@ -16,6 +16,22 @@ void SubscriptionManager_init(UA_Session *session) {
     manager->GlobalQueueSize = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
     LIST_INIT(&manager->ServerSubscriptions);
     manager->LastSessionID = (UA_UInt32) UA_DateTime_now();
+    
+    // Initialize a GUID with a 2^64 time dependant part, then fold the time in on itself to provide a more randomish
+    // Counter
+    unsigned long guidInitH = (UA_UInt64) UA_DateTime_now();
+    manager->LastJobGuid.data1 = (UA_UInt16) (guidInitH >> 32);
+    manager->LastJobGuid.data2 = (UA_UInt16) (guidInitH >> 16);
+    manager->LastJobGuid.data3 = (UA_UInt16) (guidInitH);
+    unsigned long guidInitL = (UA_UInt64) UA_DateTime_now();
+    manager->LastJobGuid.data4[0] = (UA_Byte) guidInitL;
+    manager->LastJobGuid.data4[1] = (UA_Byte) (guidInitL >> 8); 
+    manager->LastJobGuid.data4[2] = (UA_Byte) (guidInitL >> 16);
+    manager->LastJobGuid.data4[3] = (UA_Byte) (guidInitL >> 24);
+    manager->LastJobGuid.data4[4] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL >> 32);
+    manager->LastJobGuid.data4[5] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL >> 40);
+    manager->LastJobGuid.data4[6] = (UA_Byte) (manager->LastJobGuid.data4[1]) ^ (guidInitL >> 48);
+    manager->LastJobGuid.data4[7] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL >> 58);
 }
 
 void SubscriptionManager_deleteMembers(UA_Session *session) {
@@ -75,3 +91,20 @@ UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager,
     UA_free(sub);
     return UA_STATUSCODE_GOOD;
 } 
+
+UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager) {
+    UA_UInt32 id = ++(manager->LastSessionID);
+    return id;
+}
+
+UA_Guid SubscriptionManager_getUniqueGUID(UA_SubscriptionManager *manager) {
+    UA_Guid id;
+    unsigned long *incremental;
+    
+    incremental = (unsigned long *) &manager->LastJobGuid.data4[0];
+    incremental++;
+    
+    UA_Guid_copy(&(manager->LastJobGuid), &id);
+    
+    return id;
+}

+ 3 - 0
src/server/ua_subscription_manager.h

@@ -15,6 +15,7 @@ typedef struct UA_SubscriptionManager {
     UA_UInt32_BoundedValue   GlobalSamplingInterval;
     UA_UInt32_BoundedValue   GlobalQueueSize;
     UA_Int32                 LastSessionID;
+    UA_Guid                  LastJobGuid;
     LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription_s) ServerSubscriptions;
 } UA_SubscriptionManager;
 
@@ -27,4 +28,6 @@ UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager,
 UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID,
                                                  UA_UInt32 MonitoredItemID);
 
+UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager);
+UA_Guid SubscriptionManager_getUniqueGUID(UA_SubscriptionManager *manager);
 #endif /* UA_SUBSCRIPTION_MANAGER_H_ */