Browse Source

Fix subscription issues uncovered by the testing tools

Julius Pfrommer 8 years ago
parent
commit
eba0976ae4

+ 1 - 1
include/ua_server.h

@@ -115,7 +115,7 @@ typedef struct {
 	UA_DoubleRange publishingIntervalLimits;
 	UA_UInt32Range lifeTimeCountLimits;
 	UA_UInt32Range keepAliveCountLimits;
-	UA_UInt32Range notificationsPerPublishLimits;
+	UA_UInt32 maxNotificationsPerPublish;
 
 	/* Limits for monitoreditem settings */
     UA_DoubleRange samplingIntervalLimits;

+ 2 - 2
plugins/ua_config_standard.c

@@ -45,9 +45,9 @@ const UA_ServerConfig UA_ServerConfig_standard = {
     .usernamePasswordLoginsSize = 2,
     
 	.publishingIntervalLimits = { .min = 100.0, .max = 3600.0 * 1000.0 },
-    .lifeTimeCountLimits = { .max = 15000, .min = 1 },
+    .lifeTimeCountLimits = { .max = 15000, .min = 3 },
     .keepAliveCountLimits = { .max = 100, .min = 1 },
-    .notificationsPerPublishLimits = { .max = 1000, .min = 1 },
+    .maxNotificationsPerPublish = 1000,
 	.samplingIntervalLimits = { .min = 50.0, .max = 24.0 * 3600.0 * 1000.0 },
     .queueSizeLimits = { .max = 100, .min = 1 }
 };

+ 12 - 9
src/server/ua_server_worker.c

@@ -309,11 +309,18 @@ static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
             jobsCopy[i] = tw->jobs[i].job;
         dispatchJobs(server, jobsCopy, tw->jobsSize); // frees the job pointer
 #else
-        for(size_t i=0;i<tw->jobsSize;i++)
-            //processJobs may sort the list but dont delete entries
+		size_t size = tw->jobsSize;
+        for(size_t i = 0; i < size; i++)
             processJobs(server, &tw->jobs[i].job, 1); // does not free the job ptr
 #endif
 
+		/* Elements are removed only here. Check if empty. */
+		if(tw->jobsSize == 0) {
+			LIST_REMOVE(tw, pointers);
+			UA_free(tw);
+			continue;
+		}
+
         /* set the time for the next execution */
         tw->nextTime += tw->interval;
         if(tw->nextTime < current)
@@ -349,14 +356,10 @@ static void removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
         for(size_t i = 0; i < tw->jobsSize; i++) {
             if(!UA_Guid_equal(jobId, &tw->jobs[i].id))
                 continue;
-            if(tw->jobsSize == 1) {
-                LIST_REMOVE(tw, pointers);
-                UA_free(tw);
-            } else {
-                tw->jobsSize--;
+			tw->jobsSize--; /* if size == 0, tw is freed during the next processing */
+            if(tw->jobsSize > 0)
                 tw->jobs[i] = tw->jobs[tw->jobsSize]; // move the last entry to overwrite
-            }
-            goto finish; // ugly break
+            goto finish;
         }
     }
  finish:

+ 28 - 14
src/server/ua_services_subscription.c

@@ -24,10 +24,12 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
     UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
                                requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
     UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
-                               requestedLifetimeCount, subscription->lifeTime);
-    if(subscription->lifeTime < 3 * subscription->maxKeepAliveCount)
-        subscription->lifeTime = 3 * subscription->maxKeepAliveCount;
-    subscription->notificationsPerPublish = maxNotificationsPerPublish;
+                               requestedLifetimeCount, subscription->lifeTimeCount);
+    if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
+        subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
+	subscription->notificationsPerPublish = maxNotificationsPerPublish;
+	if(maxNotificationsPerPublish == 0 || maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
+		subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
     subscription->priority = priority;
     Subscription_registerPublishJob(server, subscription);
 }
@@ -47,8 +49,9 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
                             request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                             request->maxNotificationsPerPublish, request->priority);
+    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* immediately send the first response */
     response->revisedPublishingInterval = newSubscription->publishingInterval;
-    response->revisedLifetimeCount = newSubscription->lifeTime;
+    response->revisedLifetimeCount = newSubscription->lifeTimeCount;
     response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
 }
 
@@ -65,7 +68,7 @@ void Service_ModifySubscription(UA_Server *server, UA_Session *session,
                             request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                             request->maxNotificationsPerPublish, request->priority);
     response->revisedPublishingInterval = sub->publishingInterval;
-    response->revisedLifetimeCount = sub->lifeTime;
+    response->revisedLifetimeCount = sub->lifeTimeCount;
     response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
     return;
 }
@@ -73,7 +76,7 @@ void Service_ModifySubscription(UA_Server *server, UA_Session *session,
 void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
 	const UA_SetPublishingModeRequest *request,	UA_SetPublishingModeResponse *response) {
 
-	if (request->subscriptionIdsSize <= 0) {
+	if(request->subscriptionIdsSize <= 0) {
 		response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
 		return;
 	}
@@ -231,8 +234,8 @@ void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
 }
 
 void
-Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
-                UA_UInt32 requestId) {
+Service_Publish(UA_Server *server, UA_Session *session,
+	            const UA_PublishRequest *request, UA_UInt32 requestId) {
 	/* Return an error if the session has no subscription */
 	if(LIST_EMPTY(&session->serverSubscriptions)) {
 		UA_PublishResponse response;
@@ -244,12 +247,12 @@ Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest
 		return;
 	}
 
-    // todo error handling for malloc
+	// todo error handling for malloc
     UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
-    entry->requestId = requestId;
+	entry->requestId = requestId;
     UA_PublishResponse *response = &entry->response;
-    UA_PublishResponse_init(response);
-    response->responseHeader.requestHandle = request->requestHeader.requestHandle;
+	UA_PublishResponse_init(response);
+	response->responseHeader.requestHandle = request->requestHeader.requestHandle;
 
     /* Delete Acknowledged Subscription Messages */
     response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
@@ -284,7 +287,18 @@ Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest
     /* Queue the publish response */
     SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
     UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
-                 "Queued a publication message on session %u", session->authenticationToken.identifier.numeric);
+        "Queued a publication message on session %u", session->authenticationToken.identifier.numeric);
+
+    /* Answer immediately to a late subscription */
+    UA_Subscription *immediate;
+    LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
+        if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
+            UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                "Response on a late subscription on session %u", session->authenticationToken.identifier.numeric);
+            UA_Subscription_publishCallback(server, immediate);
+            return;
+        }
+    }
 }
 
 void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,

+ 107 - 71
src/server/ua_subscription.c

@@ -52,7 +52,7 @@ static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
         return;
     UA_DataValue_init(&newvalue->value);
     newvalue->clientHandle = monitoredItem->clientHandle;
-    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
                 "Creating a sample with client handle %u", newvalue->clientHandle);
   
     /* Read the value */
@@ -112,6 +112,7 @@ static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
 }
 
 UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    //SampleCallback(server, mon);
     UA_Job job = {.type = UA_JOBTYPE_METHODCALL,
                   .job.methodCall = {.method = (UA_ServerCallback)SampleCallback, .data = mon} };
     UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)mon->samplingInterval,
@@ -138,12 +139,14 @@ UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptio
         return NULL;
     new->session = session;
     new->subscriptionID = subscriptionID;
-    new->sequenceNumber = 1;
-    new->currentKeepAliveCount = 0;
+    new->sequenceNumber = 0;
     new->maxKeepAliveCount = 0;
 	new->publishingEnabled = false;
     memset(&new->publishJobGuid, 0, sizeof(UA_Guid));
     new->publishJobIsRegistered = false;
+	new->currentKeepAliveCount = 0;
+	new->currentLifetimeCount = 0;
+	new->state = UA_SUBSCRIPTIONSTATE_LATE; /* The first publish response is sent immediately */
     LIST_INIT(&new->retransmissionQueue);
     LIST_INIT(&new->MonitoredItems);
     return new;
@@ -192,26 +195,37 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
     return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 }
 
-static void PublishCallback(UA_Server *server, UA_Subscription *sub) {
+void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* Count the available notifications */
     size_t notifications = 0;
-	if(sub->publishingEnabled) {
-		UA_MonitoredItem *mon;
-		LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-			MonitoredItem_queuedValue *qv;
-			TAILQ_FOREACH(qv, &mon->queue, listEntry) {
-				if(notifications >= sub->notificationsPerPublish)
-					break;
-				notifications++;
-			}
-		}
-	}
+    UA_Boolean moreNotifications = false;
+    if(sub->publishingEnabled) {
+        UA_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+            MonitoredItem_queuedValue *qv;
+            TAILQ_FOREACH(qv, &mon->queue, listEntry) {
+                if(notifications >= sub->notificationsPerPublish) {
+                    moreNotifications = true;
+                    break;
+                }
+                notifications++;
+            }
+        }
+    }
 
-    /* Continue only if we have data or want to send a keepalive */
+    /* Return if nothing to do */
     if(notifications == 0) {
         sub->currentKeepAliveCount++;
         if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
             return;
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+            "Sending out a keepalive on subscription %u on securechannel %u", sub->subscriptionID,
+            sub->session->authenticationToken.identifier.numeric);
+    } else {
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+            "Sending out a publish response on subscription %u on securechannel %u " \
+            "with %u notifications", sub->subscriptionID,
+            sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
     }
 
     /* Check if the securechannel is valid */
@@ -219,52 +233,81 @@ static void PublishCallback(UA_Server *server, UA_Subscription *sub) {
     if(!channel)
         return;
 
-    /* Dequeue a response */
-    UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
-    if(!pre) {
-        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
-                    "Cannot send a publish response on subscription %u " \
-                    "since the publish queue is empty on session %u",
-                    sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
-        return;
-    }
-    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
+	/* Dequeue a response */
+	UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+	if(!pre) {
+		UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+			"Cannot send a publish response on subscription %u " \
+			"since the publish queue is empty on session %u",
+			sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
+		if(sub->state != UA_SUBSCRIPTIONSTATE_LATE) {
+			sub->state = UA_SUBSCRIPTIONSTATE_LATE;
+		} else {
+			sub->currentLifetimeCount++;
+			if(sub->currentLifetimeCount > sub->lifeTimeCount) {
+				UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+					"End of lifetime for subscription %u on session %u",
+					sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
+				UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
+			}
+		}
+		return;
+	}
+	SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
 
-    /* Prepare the response */
     UA_PublishResponse *response = &pre->response;
+    UA_UInt32 requestId = pre->requestId;
+
+	/* We have a request. Reset state to normal. */
+	sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
+    sub->currentKeepAliveCount = 0;
+    sub->currentLifetimeCount = 0;
+
+	/* Prepare the response */
     response->responseHeader.timestamp = UA_DateTime_now();
     response->subscriptionId = sub->subscriptionID;
+	response->moreNotifications = moreNotifications;
     UA_NotificationMessage *message = &response->notificationMessage;
-    message->sequenceNumber = ++(sub->sequenceNumber);
     message->publishTime = response->responseHeader.timestamp;
-    message->notificationData = UA_ExtensionObject_new();
-    message->notificationDataSize = 1;
-    UA_ExtensionObject *data = message->notificationData;
-    UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
-    dcn->monitoredItems = UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
-    dcn->monitoredItemsSize = notifications;
-    size_t l = 0;
-	UA_MonitoredItem *mon;
-    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-        MonitoredItem_queuedValue *qv, *qv_tmp;
-        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
-            if(notifications <= l)
-                break;
-            UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
-            min->clientHandle = qv->clientHandle;
-            min->value = qv->value;
-            TAILQ_REMOVE(&mon->queue, qv, listEntry);
-            UA_free(qv);
-            mon->currentQueueSize--;
-            l++;
+    if(notifications > 0) {
+        message->sequenceNumber = ++sub->sequenceNumber;
+
+        /* Collect the notification messages */
+        message->notificationData = UA_ExtensionObject_new();
+        message->notificationDataSize = 1;
+        UA_ExtensionObject *data = message->notificationData;
+        UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
+        dcn->monitoredItems = UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+        dcn->monitoredItemsSize = notifications;
+        size_t l = 0;
+        UA_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+            MonitoredItem_queuedValue *qv, *qv_tmp;
+            TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
+                if(notifications <= l)
+                    break;
+                UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
+                min->clientHandle = qv->clientHandle;
+                min->value = qv->value;
+                TAILQ_REMOVE(&mon->queue, qv, listEntry);
+                UA_free(qv);
+                mon->currentQueueSize--;
+                l++;
+            }
         }
-    }
-    data->encoding = UA_EXTENSIONOBJECT_DECODED;
-    data->content.decoded.data = dcn;
-    data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+        data->encoding = UA_EXTENSIONOBJECT_DECODED;
+        data->content.decoded.data = dcn;
+        data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+
+        /* Put the notification message into the retransmission queue */
+        UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
+        retransmission->message = response->notificationMessage;
+        LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
+    } else /* Send sequence number for the next notification */
+		message->sequenceNumber = sub->sequenceNumber + 1;
 
     /* Get the available sequence numbers from the retransmission queue */
-    size_t available = 1;
+    size_t available = 0;
     UA_NotificationMessageEntry *nme;
     LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
         available++;
@@ -275,31 +318,24 @@ static void PublishCallback(UA_Server *server, UA_Subscription *sub) {
         response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
         i++;
     }
-    response->availableSequenceNumbers[i] = message->sequenceNumber;
-    
-    /* send out the response */
-    UA_SecureChannel_sendBinaryMessage(channel, pre->requestId, response,
-                                       &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
-                 "Sending out a publish response on subscription %u on securechannel %u " \
-                 "with %u notifications", sub->subscriptionID,
-                 sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
 
-    /* Reset the keepalive count */
-    sub->currentKeepAliveCount = 0;
+    /* Send the response */
+    UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
+                                       &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
-    /* Put the notification message into the retransmission queue and delete the response */
-    UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
-    retransmission->message = response->notificationMessage;
-    UA_NotificationMessage_init(&response->notificationMessage);
-    LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
-    UA_PublishResponse_deleteMembers(response);
+    /* Remove the queued request */
+    UA_NotificationMessage_init(&response->notificationMessage); /* The notification message was put into the queue */
+    UA_PublishResponse_deleteMembers(&pre->response);
     UA_free(pre);
+
+    /* Repeat if there are more notifications to send */
+    if(moreNotifications)
+        UA_Subscription_publishCallback(server, sub);
 }
 
 UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
     UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
-                           .job.methodCall = {.method = (UA_ServerCallback)PublishCallback, .data = sub} };
+                           .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback, .data = sub} };
     UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
                  "Adding a subscription with %i millisec interval", (int)sub->publishingInterval);
     UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,

+ 13 - 1
src/server/ua_subscription.h

@@ -65,12 +65,21 @@ typedef struct UA_NotificationMessageEntry {
     UA_NotificationMessage message;
 } UA_NotificationMessageEntry;
 
+/* We use only a subset of the states defined in the standard */
+typedef enum {
+	/* UA_SUBSCRIPTIONSTATE_CLOSED */
+	/* UA_SUBSCRIPTIONSTATE_CREATING */
+	UA_SUBSCRIPTIONSTATE_NORMAL,
+	UA_SUBSCRIPTIONSTATE_LATE,
+    UA_SUBSCRIPTIONSTATE_KEEPALIVE
+} UA_SubscriptionState;
+
 struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
 
     /* Settings */
     UA_Session *session;
-    UA_UInt32 lifeTime;
+    UA_UInt32 lifeTimeCount;
     UA_UInt32 maxKeepAliveCount;
     UA_Double publishingInterval;     // [ms] 
     UA_UInt32 subscriptionID;
@@ -80,7 +89,9 @@ struct UA_Subscription {
     UA_UInt32 sequenceNumber;
 
     /* Runtime information */
+	UA_SubscriptionState state;
     UA_UInt32 currentKeepAliveCount;
+	UA_UInt32 currentLifetimeCount;
 
     /* Publish Job */
     UA_Guid publishJobGuid;
@@ -102,5 +113,6 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
 UA_MonitoredItem *
 UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID);
 
+void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);
 
 #endif /* UA_SUBSCRIPTION_H_ */