|
@@ -12,7 +12,7 @@ UA_MonitoredItem * UA_MonitoredItem_new() {
|
|
new->subscription = NULL;
|
|
new->subscription = NULL;
|
|
new->currentQueueSize = 0;
|
|
new->currentQueueSize = 0;
|
|
new->maxQueueSize = 0;
|
|
new->maxQueueSize = 0;
|
|
- new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY; // TODO: This is currently hardcoded;
|
|
|
|
|
|
+ new->monitoredItemType = UA_MONITOREDITEMTYPE_CHANGENOTIFY; /* currently hardcoded */
|
|
new->timestampsToReturn = UA_TIMESTAMPSTORETURN_SOURCE;
|
|
new->timestampsToReturn = UA_TIMESTAMPSTORETURN_SOURCE;
|
|
UA_String_init(&new->indexRange);
|
|
UA_String_init(&new->indexRange);
|
|
TAILQ_INIT(&new->queue);
|
|
TAILQ_INIT(&new->queue);
|
|
@@ -41,22 +41,22 @@ void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
}
|
|
}
|
|
|
|
|
|
static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
- if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY) {
|
|
|
|
|
|
+ if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
"Cannot process a monitoreditem that is not a data change notification");
|
|
"Cannot process a monitoreditem that is not a data change notification");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
|
- "Sampling the value on monitoreditem %u", monitoredItem->itemId);
|
|
|
|
-
|
|
|
|
MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
- if(!newvalue)
|
|
|
|
|
|
+ if(!newvalue) {
|
|
|
|
+ UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
|
+ "Skipped a sample due to lack of memory on monitoreditem %u", monitoredItem->itemId);
|
|
return;
|
|
return;
|
|
|
|
+ }
|
|
UA_DataValue_init(&newvalue->value);
|
|
UA_DataValue_init(&newvalue->value);
|
|
newvalue->clientHandle = monitoredItem->clientHandle;
|
|
newvalue->clientHandle = monitoredItem->clientHandle;
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
- "Creating a sample with client handle %u", newvalue->clientHandle);
|
|
|
|
|
|
+ "Sampling the value on monitoreditem %u", monitoredItem->itemId);
|
|
|
|
|
|
/* Read the value */
|
|
/* Read the value */
|
|
UA_ReadValueId rvid;
|
|
UA_ReadValueId rvid;
|
|
@@ -92,7 +92,7 @@ static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- /* do we have space? */
|
|
|
|
|
|
+ /* do we have space in the queue? */
|
|
if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
|
|
if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
|
|
if(!monitoredItem->discardOldest) {
|
|
if(!monitoredItem->discardOldest) {
|
|
// We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
// We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
@@ -222,14 +222,6 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
sub->currentKeepAliveCount++;
|
|
sub->currentKeepAliveCount++;
|
|
if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
|
|
if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
|
|
return;
|
|
return;
|
|
- UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
|
- "Sending out a keepalive on subscription %u on securechannel %u", sub->subscriptionID,
|
|
|
|
- sub->session->authenticationToken.identifier.numeric);
|
|
|
|
- } else {
|
|
|
|
- UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
|
- "Sending out a publish response on subscription %u on securechannel %u " \
|
|
|
|
- "with %u notifications", sub->subscriptionID,
|
|
|
|
- sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/* Check if the securechannel is valid */
|
|
/* Check if the securechannel is valid */
|
|
@@ -248,7 +240,7 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
} else {
|
|
} else {
|
|
sub->currentLifetimeCount++;
|
|
sub->currentLifetimeCount++;
|
|
- if(sub->currentLifetimeCount > sub->lifeTimeCount) {
|
|
|
|
|
|
+ if(sub->currentLifetimeCount >= sub->lifeTimeCount) {
|
|
UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
"End of lifetime for subscription %u on session %u",
|
|
"End of lifetime for subscription %u on session %u",
|
|
sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
|
|
sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
|
|
@@ -258,7 +250,6 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
|
|
SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
|
|
-
|
|
|
|
UA_PublishResponse *response = &pre->response;
|
|
UA_PublishResponse *response = &pre->response;
|
|
UA_UInt32 requestId = pre->requestId;
|
|
UA_UInt32 requestId = pre->requestId;
|
|
|
|
|
|
@@ -273,7 +264,11 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
response->moreNotifications = moreNotifications;
|
|
response->moreNotifications = moreNotifications;
|
|
UA_NotificationMessage *message = &response->notificationMessage;
|
|
UA_NotificationMessage *message = &response->notificationMessage;
|
|
message->publishTime = response->responseHeader.timestamp;
|
|
message->publishTime = response->responseHeader.timestamp;
|
|
- if(notifications > 0) {
|
|
|
|
|
|
+ if(notifications == 0) {
|
|
|
|
+ /* Send sequence number for the next notification */
|
|
|
|
+ message->sequenceNumber = sub->sequenceNumber + 1;
|
|
|
|
+ } else {
|
|
|
|
+ /* Increase the sequence number */
|
|
message->sequenceNumber = ++sub->sequenceNumber;
|
|
message->sequenceNumber = ++sub->sequenceNumber;
|
|
|
|
|
|
/* Collect the notification messages */
|
|
/* Collect the notification messages */
|
|
@@ -307,11 +302,10 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
|
|
UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
|
|
retransmission->message = response->notificationMessage;
|
|
retransmission->message = response->notificationMessage;
|
|
LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
|
|
LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
|
|
- } else /* Send sequence number for the next notification */
|
|
|
|
- message->sequenceNumber = sub->sequenceNumber + 1;
|
|
|
|
|
|
+ }
|
|
|
|
|
|
/* Get the available sequence numbers from the retransmission queue */
|
|
/* Get the available sequence numbers from the retransmission queue */
|
|
- size_t available = 0;
|
|
|
|
|
|
+ size_t available = 0, i = 0;
|
|
UA_NotificationMessageEntry *nme;
|
|
UA_NotificationMessageEntry *nme;
|
|
LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
|
|
LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
|
|
available++;
|
|
available++;
|
|
@@ -319,18 +313,21 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
|
|
response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
|
|
response->availableSequenceNumbersSize = available;
|
|
response->availableSequenceNumbersSize = available;
|
|
}
|
|
}
|
|
- size_t i = 0;
|
|
|
|
LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
|
|
LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
|
|
response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
|
|
response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
|
|
i++;
|
|
i++;
|
|
}
|
|
}
|
|
|
|
|
|
/* Send the response */
|
|
/* Send the response */
|
|
|
|
+ UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
|
+ "Sending out a publish response on subscription %u on securechannel %u " \
|
|
|
|
+ "with %u notifications", sub->subscriptionID,
|
|
|
|
+ sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
|
|
UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
|
|
UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
|
|
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
|
|
/* Remove the queued request */
|
|
/* Remove the queued request */
|
|
- UA_NotificationMessage_init(&response->notificationMessage); /* The notification message was put into the queue and is not freed */
|
|
|
|
|
|
+ UA_NotificationMessage_init(&response->notificationMessage); /* message was copied to the queue */
|
|
response->availableSequenceNumbers = NULL; /* stack-allocated */
|
|
response->availableSequenceNumbers = NULL; /* stack-allocated */
|
|
response->availableSequenceNumbersSize = 0;
|
|
response->availableSequenceNumbersSize = 0;
|
|
UA_PublishResponse_deleteMembers(&pre->response);
|
|
UA_PublishResponse_deleteMembers(&pre->response);
|
|
@@ -343,7 +340,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
|
|
|
UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
|
|
UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
|
|
UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
|
|
UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
|
|
- .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback, .data = sub} };
|
|
|
|
|
|
+ .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback,
|
|
|
|
+ .data = sub} };
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
"Adding a subscription with %i millisec interval", (int)sub->publishingInterval);
|
|
"Adding a subscription with %i millisec interval", (int)sub->publishingInterval);
|
|
UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
|
|
UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
|
|
@@ -353,8 +351,8 @@ UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription
|
|
sub->publishJobIsRegistered = true;
|
|
sub->publishJobIsRegistered = true;
|
|
else
|
|
else
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
- "Could not register a subscription publication job with status code 0x%08x\n",
|
|
|
|
- retval);
|
|
|
|
|
|
+ "Could not register a subscription publication job " \
|
|
|
|
+ "with status code 0x%08x\n", retval);
|
|
return retval;
|
|
return retval;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -365,7 +363,7 @@ UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscripti
|
|
UA_StatusCode retval = UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
|
|
UA_StatusCode retval = UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
|
|
if(retval != UA_STATUSCODE_GOOD)
|
|
if(retval != UA_STATUSCODE_GOOD)
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
- "Could not remove a subscription publication job with status code 0x%08x\n",
|
|
|
|
- retval);
|
|
|
|
|
|
+ "Could not remove a subscription publication job " \
|
|
|
|
+ "with status code 0x%08x\n", retval);
|
|
return retval;
|
|
return retval;
|
|
}
|
|
}
|