Browse Source

Save Handling of Subscriptionlist and Workerjobs

Jörg Schüler-Maroldt 9 years ago
parent
commit
e58d410a5a

+ 12 - 7
src/server/ua_server_worker.c

@@ -325,14 +325,12 @@ UA_StatusCode UA_Server_addRepeatedJob(UA_Server *server, UA_Job job, UA_UInt32
 /* Returns the timeout until the next repeated job in ms */
 static UA_UInt16 processRepeatedJobs(UA_Server *server) {
     UA_DateTime current = UA_DateTime_now();
-    struct RepeatedJobs *next = LIST_FIRST(&server->repeatedJobs);
-    struct RepeatedJobs *tw = UA_NULL;
+    struct RepeatedJobs *tw = LIST_FIRST(&server->repeatedJobs);
 
-    while(next) {
-        tw = next;
+    while (tw) {
+        //tw = nextjob;
         if(tw->nextTime > current)
             break;
-        next = LIST_NEXT(tw, pointers);
 
 #ifdef UA_MULTITHREADING
         // copy the entry and insert at the new location
@@ -345,8 +343,13 @@ static UA_UInt16 processRepeatedJobs(UA_Server *server) {
             jobsCopy[i] = tw->jobs[i].job;
         dispatchJobs(server, jobsCopy, tw->jobsSize); // frees the job pointer
 #else
-        for(size_t i=0;i<tw->jobsSize;i++)
+        for (size_t i = 0; i < tw->jobsSize; i++)
+        {
+            // be carefull
+            // methodCall may sort the list but dont delete entries
             processJobs(server, &tw->jobs[i].job, 1); // does not free the job ptr
+            // LIST_NEXT(tw, pointers) is still vaid
+        }
 #endif
         tw->nextTime += tw->interval;
         struct RepeatedJobs *prevTw = tw; // after which tw do we insert?
@@ -356,10 +359,12 @@ static UA_UInt16 processRepeatedJobs(UA_Server *server) {
                 break;
             prevTw = n;
         }
-        if(prevTw != tw) {
+        struct RepeatedJobs *nextTw = LIST_NEXT(tw, pointers);
+        if (prevTw != tw) {
             LIST_REMOVE(tw, pointers);
             LIST_INSERT_AFTER(prevTw, tw, pointers);
         }
+        tw = nextTw;
     }
 
     // check if the next repeated job is sooner than the usual timeout

+ 6 - 6
src/server/ua_services_subscription.c

@@ -21,7 +21,7 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     /* set the publishing interval */
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalPublishingInterval,
                                request->requestedPublishingInterval, response->revisedPublishingInterval);
-    newSubscription->publishingInterval = response->revisedPublishingInterval;
+    newSubscription->publishingInterval = (UA_DateTime)response->revisedPublishingInterval;
     
     /* set the subscription lifetime (deleted when no publish requests arrive within this time) */
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalLifeTimeCount,
@@ -83,7 +83,7 @@ static void createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subs
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalSamplingInterval,
                                request->requestedParameters.samplingInterval,
                                result->revisedSamplingInterval);
-    newMon->samplingInterval = result->revisedSamplingInterval;
+    newMon->samplingInterval = (UA_UInt32)result->revisedSamplingInterval;
 
     /* set the queue size */
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalQueueSize,
@@ -148,7 +148,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
             response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             continue;
         }
-        if(Subscription_deleteUnpublishedNotification(request->subscriptionAcknowledgements[i].sequenceNumber, sub) == 0)
+        if(Subscription_deleteUnpublishedNotification(request->subscriptionAcknowledgements[i].sequenceNumber, false, sub) == 0)
             response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
     }
     
@@ -176,7 +176,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
             // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
             response->availableSequenceNumbersSize = 0;
             // .. and must be deleted
-            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, sub);
+            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
         } else {
             response->availableSequenceNumbersSize = Subscription_queuedNotifications(sub);
             response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
@@ -196,7 +196,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
         sub->keepAliveCount.currentValue=sub->keepAliveCount.minValue;
         Subscription_generateKeepAlive(sub);
         Subscription_copyTopNotificationMessage(&response->notificationMessage, sub);
-        Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, sub);
+        Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
     }
     
     // FIXME: This should be in processMSG();
@@ -215,7 +215,7 @@ void Service_ModifySubscription(UA_Server *server, UA_Session *session,
     
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalPublishingInterval,
                                request->requestedPublishingInterval, response->revisedPublishingInterval);
-    sub->publishingInterval = response->revisedPublishingInterval;
+    sub->publishingInterval = (UA_DateTime)response->revisedPublishingInterval;
     
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalLifeTimeCount,
                                request->requestedLifetimeCount, response->revisedLifetimeCount);

+ 5 - 9
src/server/ua_subscription.c

@@ -22,7 +22,6 @@ UA_Subscription *UA_Subscription_new(UA_Int32 subscriptionID) {
 }
 
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
-    UA_unpublishedNotification *not, *tmp_not;
     UA_MonitoredItem *mon, *tmp_mon;
     
     // Just in case any parallel process attempts to access this subscription
@@ -36,9 +35,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
     }
     
     // Delete unpublished Notifications
-    LIST_FOREACH_SAFE(not, &subscription->unpublishedNotifications, listEntry, tmp_not) {
-        Subscription_deleteUnpublishedNotification(not->notification->sequenceNumber, subscription);
-    }
+    Subscription_deleteUnpublishedNotification(0, true, subscription);
     
     // Unhook/Unregister any timed work assiociated with this subscription
     if(subscription->timedUpdateJob != UA_NULL){
@@ -80,7 +77,7 @@ void Subscription_generateKeepAlive(UA_Subscription *subscription) {
 void Subscription_updateNotifications(UA_Subscription *subscription) {
     UA_MonitoredItem *mon;
     //MonitoredItem_queuedValue *queuedValue;
-    UA_unpublishedNotification *msg = NULL, *tempmsg;
+    UA_unpublishedNotification *msg;
     UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
     UA_DataChangeNotification *changeNotification;
     size_t notificationOffset;
@@ -105,8 +102,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
     if(Subscription_queuedNotifications(subscription) >= 10) {
         // Remove last entry
-        LIST_FOREACH_SAFE(msg, &subscription->unpublishedNotifications, listEntry, tempmsg)
-            Subscription_deleteUnpublishedNotification(msg->notification->sequenceNumber, subscription);
+        Subscription_deleteUnpublishedNotification(0, true, subscription);
     }
     
     if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
@@ -226,11 +222,11 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
                        &dst->notificationData->body);
 }
 
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub) {
     UA_UInt32 deletedItems = 0;
     UA_unpublishedNotification *not, *tmp;
     LIST_FOREACH_SAFE(not, &sub->unpublishedNotifications, listEntry, tmp) {
-        if(not->notification->sequenceNumber != seqNo)
+        if (!bDeleteAll && not->notification->sequenceNumber != seqNo)
             continue;
         LIST_REMOVE(not, listEntry);
         if(not->notification) {

+ 3 - 3
src/server/ua_subscription.h

@@ -45,7 +45,7 @@ typedef struct UA_MonitoredItem {
     UA_NodeId monitoredNodeId; 
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
-    UA_UInt32 samplingInterval;
+    UA_UInt32 samplingInterval; // [ms]
     UA_UInt32_BoundedValue queueSize;
     UA_Boolean discardOldest;
     UA_DateTime lastSampled;
@@ -77,7 +77,7 @@ typedef struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
     UA_UInt32_BoundedValue lifeTime;
     UA_Int32_BoundedValue keepAliveCount;
-    UA_DateTime publishingInterval;
+    UA_DateTime publishingInterval;     // [ms] may be UA_Int32
     UA_DateTime lastPublished;
     UA_Int32 subscriptionID;
     UA_Int32 notificationsPerPublish;
@@ -97,7 +97,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription);
 UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
 UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
 void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub);
 void Subscription_generateKeepAlive(UA_Subscription *subscription);
 UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
 UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);