|
@@ -22,35 +22,33 @@
|
|
|
UA_Subscription *
|
|
|
UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
|
|
|
/* Allocate the memory */
|
|
|
- UA_Subscription *newItem =
|
|
|
+ UA_Subscription *newSub =
|
|
|
(UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription));
|
|
|
- if(!newItem)
|
|
|
+ if(!newSub)
|
|
|
return NULL;
|
|
|
|
|
|
/* Remaining members are covered by calloc zeroing out the memory */
|
|
|
- newItem->session = session;
|
|
|
- newItem->subscriptionId = subscriptionId;
|
|
|
- newItem->numMonitoredItems = 0;
|
|
|
- newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
|
|
|
- TAILQ_INIT(&newItem->retransmissionQueue);
|
|
|
- newItem->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
|
|
|
- return newItem;
|
|
|
+ newSub->session = session;
|
|
|
+ newSub->subscriptionId = subscriptionId;
|
|
|
+ newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
|
|
|
+ TAILQ_INIT(&newSub->retransmissionQueue);
|
|
|
+ TAILQ_INIT(&newSub->notificationQueue);
|
|
|
+ return newSub;
|
|
|
}
|
|
|
|
|
|
void
|
|
|
UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Delete the subscription",
|
|
|
- sub->subscriptionId);
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
|
|
|
+ "Delete the subscription", sub->subscriptionId);
|
|
|
|
|
|
Subscription_unregisterPublishCallback(server, sub);
|
|
|
|
|
|
/* Delete monitored Items */
|
|
|
UA_MonitoredItem *mon, *tmp_mon;
|
|
|
LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) {
|
|
|
- LIST_REMOVE(mon, listEntry);
|
|
|
MonitoredItem_delete(server, mon);
|
|
|
}
|
|
|
+ sub->monitoredItemsSize = 0;
|
|
|
|
|
|
/* Delete Retransmission Queue */
|
|
|
UA_NotificationMessageEntry *nme, *nme_tmp;
|
|
@@ -63,11 +61,10 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
|
|
|
}
|
|
|
|
|
|
UA_MonitoredItem *
|
|
|
-UA_Subscription_getMonitoredItem(UA_Subscription *sub,
|
|
|
- UA_UInt32 monitoredItemId) {
|
|
|
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) {
|
|
|
UA_MonitoredItem *mon;
|
|
|
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
|
|
|
- if(mon->itemId == monitoredItemId)
|
|
|
+ if(mon->monitoredItemId == monitoredItemId)
|
|
|
break;
|
|
|
}
|
|
|
return mon;
|
|
@@ -79,53 +76,24 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
|
|
|
/* Find the MonitoredItem */
|
|
|
UA_MonitoredItem *mon;
|
|
|
LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
|
|
|
- if(mon->itemId == monitoredItemId)
|
|
|
+ if(mon->monitoredItemId == monitoredItemId)
|
|
|
break;
|
|
|
}
|
|
|
if(!mon)
|
|
|
return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
|
|
|
|
|
|
/* Remove the MonitoredItem */
|
|
|
- LIST_REMOVE(mon, listEntry);
|
|
|
MonitoredItem_delete(server, mon);
|
|
|
- sub->numMonitoredItems--;
|
|
|
+ sub->monitoredItemsSize--;
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
|
void
|
|
|
UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon) {
|
|
|
- sub->numMonitoredItems++;
|
|
|
+ sub->monitoredItemsSize++;
|
|
|
LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
|
|
|
}
|
|
|
|
|
|
-UA_UInt32
|
|
|
-UA_Subscription_getNumMonitoredItems(UA_Subscription *sub) {
|
|
|
- return sub->numMonitoredItems;
|
|
|
-}
|
|
|
-
|
|
|
-static size_t
|
|
|
-countQueuedNotifications(UA_Subscription *sub,
|
|
|
- UA_Boolean *moreNotifications) {
|
|
|
- if(!sub->publishingEnabled)
|
|
|
- return 0;
|
|
|
-
|
|
|
- size_t notifications = 0;
|
|
|
- UA_MonitoredItem *mon;
|
|
|
- LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
|
|
|
- MonitoredItem_queuedValue *qv;
|
|
|
- TAILQ_FOREACH(qv, &mon->queue, listEntry) {
|
|
|
- if(notifications >= sub->notificationsPerPublish) {
|
|
|
- *moreNotifications = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- /* Only count if the item was sampled before lastTriggeredPublishCallback or events */
|
|
|
- if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY))
|
|
|
- ++notifications;
|
|
|
- }
|
|
|
- }
|
|
|
- return notifications;
|
|
|
-}
|
|
|
-
|
|
|
static void
|
|
|
UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
|
|
|
UA_NotificationMessageEntry *entry) {
|
|
@@ -146,8 +114,7 @@ UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub
|
|
|
}
|
|
|
|
|
|
UA_StatusCode
|
|
|
-UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
|
|
|
- UA_UInt32 sequenceNumber) {
|
|
|
+UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
|
|
|
/* Find the retransmission message */
|
|
|
UA_NotificationMessageEntry *entry;
|
|
|
TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
|
|
@@ -165,55 +132,40 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
|
-static UA_MonitoredItem *
|
|
|
-selectFirstMonToIterate(UA_Subscription *sub) {
|
|
|
- UA_MonitoredItem *mon = LIST_FIRST(&sub->monitoredItems);
|
|
|
- if(sub->nextMonitoredItemIdToBrowse > 0) {
|
|
|
- while(mon) {
|
|
|
- if(mon->itemId == sub->nextMonitoredItemIdToBrowse)
|
|
|
- break;
|
|
|
- mon = LIST_NEXT(mon, listEntry);
|
|
|
- }
|
|
|
- if(!mon)
|
|
|
- mon = LIST_FIRST(&sub->monitoredItems);
|
|
|
- }
|
|
|
- return mon;
|
|
|
-}
|
|
|
-
|
|
|
/* Iterate over the monitoreditems of the subscription, starting at mon, and
|
|
|
* move notifications into the response. */
|
|
|
static void
|
|
|
-moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItem *mon,
|
|
|
- UA_MonitoredItemNotification *mins, size_t minsSize,
|
|
|
- size_t *pos) {
|
|
|
- MonitoredItem_queuedValue *qv, *qv_tmp;
|
|
|
- while(mon) {
|
|
|
- sub->nextMonitoredItemIdToBrowse = mon->itemId;
|
|
|
- TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
|
|
|
- if(*pos >= minsSize)
|
|
|
- return;
|
|
|
- /* Only move if the item was sampled before lastTriggeredPublishCallback or events */
|
|
|
- if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)) {
|
|
|
- UA_MonitoredItemNotification *min = &mins[*pos];
|
|
|
- min->clientHandle = qv->clientHandle;
|
|
|
- if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
|
|
|
- min->value = qv->data.value;
|
|
|
- } else {
|
|
|
- /* TODO implementation for events */
|
|
|
- }
|
|
|
- TAILQ_REMOVE(&mon->queue, qv, listEntry);
|
|
|
- UA_free(qv);
|
|
|
- --mon->currentQueueSize;
|
|
|
- ++(*pos);
|
|
|
- }
|
|
|
+moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotification *mins,
|
|
|
+ size_t minsSize) {
|
|
|
+ size_t pos = 0;
|
|
|
+ UA_Notification *notification, *notification_tmp;
|
|
|
+ TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) {
|
|
|
+ if(pos >= minsSize)
|
|
|
+ return;
|
|
|
+
|
|
|
+ UA_MonitoredItem *mon = notification->mon;
|
|
|
+
|
|
|
+ /* Remove the notification from the queues */
|
|
|
+ TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
|
|
|
+ TAILQ_REMOVE(&mon->queue, notification, listEntry);
|
|
|
+ --mon->queueSize;
|
|
|
+ --sub->notificationQueueSize;
|
|
|
+
|
|
|
+ /* Move the content to the response */
|
|
|
+ UA_MonitoredItemNotification *min = &mins[pos];
|
|
|
+ min->clientHandle = mon->clientHandle;
|
|
|
+ if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
|
|
|
+ min->value = notification->data.value;
|
|
|
+ } else {
|
|
|
+ /* TODO implementation for events */
|
|
|
}
|
|
|
- mon = LIST_NEXT(mon, listEntry);
|
|
|
+ UA_free(notification);
|
|
|
+ ++pos;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
static UA_StatusCode
|
|
|
-prepareNotificationMessage(UA_Subscription *sub,
|
|
|
- UA_NotificationMessage *message,
|
|
|
+prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
|
|
|
size_t notifications) {
|
|
|
/* Array of ExtensionObject to hold different kinds of notifications
|
|
|
* (currently only DataChangeNotifications) */
|
|
@@ -245,18 +197,7 @@ prepareNotificationMessage(UA_Subscription *sub,
|
|
|
|
|
|
/* Move notifications into the response .. the point of no return */
|
|
|
|
|
|
- /* Select the first monitoredItem or the first monitoreditem after the last
|
|
|
- * that was processed. */
|
|
|
- UA_MonitoredItem *mon = selectFirstMonToIterate(sub);
|
|
|
-
|
|
|
- /* Move notifications into the response */
|
|
|
- size_t l = 0;
|
|
|
- moveNotificationsFromMonitoredItems(sub, mon, dcn->monitoredItems, notifications, &l);
|
|
|
- if(l < notifications) {
|
|
|
- /* Not done. We skipped MonitoredItems. Restart at the beginning. */
|
|
|
- moveNotificationsFromMonitoredItems(sub, LIST_FIRST(&sub->monitoredItems),
|
|
|
- dcn->monitoredItems, notifications, &l);
|
|
|
- }
|
|
|
+ moveNotificationsFromMonitoredItems(sub, dcn->monitoredItems, notifications);
|
|
|
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
@@ -271,15 +212,20 @@ UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) {
|
|
|
return nextSequenceNumber;
|
|
|
}
|
|
|
|
|
|
+static void
|
|
|
+publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
+ sub->readyNotifications = sub->notificationQueueSize;
|
|
|
+ UA_Subscription_publish(server, sub);
|
|
|
+}
|
|
|
+
|
|
|
void
|
|
|
-UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Publish Callback",
|
|
|
- sub->subscriptionId);
|
|
|
+UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
|
|
|
+ "Publish Callback", sub->subscriptionId);
|
|
|
/* Dequeue a response */
|
|
|
- UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
|
|
|
+ UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(sub->session);
|
|
|
if(pre) {
|
|
|
- sub->currentLifetimeCount = 0; /* Reset the lifetimecounter */
|
|
|
+ sub->currentLifetimeCount = 0; /* Reset the LifetimeCounter */
|
|
|
} else {
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | The publish queue is empty",
|
|
@@ -296,27 +242,42 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if (sub->readyNotifications > sub->notificationQueueSize)
|
|
|
+ sub->readyNotifications = sub->notificationQueueSize;
|
|
|
+
|
|
|
/* Count the available notifications */
|
|
|
+ UA_UInt32 notifications = sub->readyNotifications;
|
|
|
+ if(!sub->publishingEnabled)
|
|
|
+ notifications = 0;
|
|
|
+
|
|
|
UA_Boolean moreNotifications = false;
|
|
|
- size_t notifications = countQueuedNotifications(sub, &moreNotifications);
|
|
|
+ if(notifications > sub->notificationsPerPublish) {
|
|
|
+ notifications = sub->notificationsPerPublish;
|
|
|
+ moreNotifications = true;
|
|
|
+ }
|
|
|
|
|
|
/* Return if no notifications and no keepalive */
|
|
|
if(notifications == 0) {
|
|
|
++sub->currentKeepAliveCount;
|
|
|
- if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
|
|
|
+ if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) {
|
|
|
+ if(pre)
|
|
|
+ UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
|
|
|
return;
|
|
|
+ }
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | Sending a KeepAlive",
|
|
|
sub->subscriptionId);
|
|
|
}
|
|
|
|
|
|
- /* Can we send a response? */
|
|
|
+ /* We want to send a response. Is it possible? */
|
|
|
UA_SecureChannel *channel = sub->session->header.channel;
|
|
|
if(!channel || !pre) {
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Want to send a publish response;"
|
|
|
- "but not possible", sub->subscriptionId);
|
|
|
+ "Subscription %u | Want to send a publish response but can't. "
|
|
|
+ "The subscription is late.", sub->subscriptionId);
|
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
|
+ if(pre)
|
|
|
+ UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -326,13 +287,13 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
UA_NotificationMessageEntry *retransmission = NULL;
|
|
|
if(notifications > 0) {
|
|
|
/* Allocate the retransmission entry */
|
|
|
- retransmission = (UA_NotificationMessageEntry*)
|
|
|
- UA_malloc(sizeof(UA_NotificationMessageEntry));
|
|
|
+ retransmission = (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
|
|
|
if(!retransmission) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Could not allocate memory "
|
|
|
- "for retransmission", sub->subscriptionId);
|
|
|
+ "Subscription %u | Could not allocate memory for retransmission. "
|
|
|
+ "The subscription is late.", sub->subscriptionId);
|
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
|
+ UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -340,18 +301,20 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
UA_StatusCode retval = prepareNotificationMessage(sub, message, notifications);
|
|
|
if(retval != UA_STATUSCODE_GOOD) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Could not prepare the "
|
|
|
- "notification message", sub->subscriptionId);
|
|
|
+ "Subscription %u | Could not prepare the notification message. "
|
|
|
+ "The subscription is late.", sub->subscriptionId);
|
|
|
UA_free(retransmission);
|
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
|
+ UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/* <-- The point of no return --> */
|
|
|
|
|
|
- /* Remove the response from the response queue */
|
|
|
- UA_Session_removePublishReq(sub->session, pre);
|
|
|
+ /* Adjust the number of ready notifications */
|
|
|
+ UA_assert(sub->readyNotifications >= notifications);
|
|
|
+ sub->readyNotifications -= notifications;
|
|
|
|
|
|
/* Set up the response */
|
|
|
response->responseHeader.timestamp = UA_DateTime_now();
|
|
@@ -376,9 +339,9 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
|
|
|
/* Get the available sequence numbers from the retransmission queue */
|
|
|
size_t available = sub->retransmissionQueueSize;
|
|
|
+ UA_STACKARRAY(UA_UInt32, seqNumbers, available);
|
|
|
if(available > 0) {
|
|
|
- response->availableSequenceNumbers =
|
|
|
- (UA_UInt32*)UA_alloca(available * sizeof(UA_UInt32));
|
|
|
+ response->availableSequenceNumbers = seqNumbers;
|
|
|
response->availableSequenceNumbersSize = available;
|
|
|
size_t i = 0;
|
|
|
UA_NotificationMessageEntry *nme;
|
|
@@ -397,40 +360,25 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
UA_MESSAGETYPE_MSG, response,
|
|
|
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
|
|
- /* Reset subscription state to normal. */
|
|
|
+ /* Reset subscription state to normal */
|
|
|
sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
|
|
|
sub->currentKeepAliveCount = 0;
|
|
|
|
|
|
/* Free the response */
|
|
|
- UA_Array_delete(response->results, response->resultsSize,
|
|
|
- &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
- UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
|
|
|
+ UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
+ UA_free(pre); /* No need for UA_PublishResponse_deleteMembers */
|
|
|
|
|
|
- if(!moreNotifications) {
|
|
|
- /* All notifications were sent. The next time, just start at the first
|
|
|
- * monitoreditem. */
|
|
|
- sub->nextMonitoredItemIdToBrowse = 0;
|
|
|
- } else {
|
|
|
- /* Repeat sending responses right away if there are more notifications
|
|
|
- * to send */
|
|
|
- UA_Subscription_publishCallback(server, sub);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-static void
|
|
|
-UA_Subscription_publishTriggeredCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
- sub->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
|
|
|
- UA_Subscription_publishCallback(server, sub);
|
|
|
+ /* Repeat sending responses if there are more notifications to send */
|
|
|
+ if(moreNotifications)
|
|
|
+ UA_Subscription_publish(server, sub);
|
|
|
}
|
|
|
|
|
|
UA_Boolean
|
|
|
UA_Subscription_reachedPublishReqLimit(UA_Server *server, UA_Session *session) {
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, session,
|
|
|
- "Reached number of publish request limit");
|
|
|
-
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, session, "Reached number of publish request limit");
|
|
|
|
|
|
/* Dequeue a response */
|
|
|
- UA_PublishResponseEntry *pre = UA_Session_getPublishReq(session);
|
|
|
+ UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session);
|
|
|
|
|
|
/* Cannot publish without a response */
|
|
|
if(!pre) {
|
|
@@ -438,13 +386,10 @@ UA_Subscription_reachedPublishReqLimit(UA_Server *server, UA_Session *session)
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- UA_PublishResponse *response = &pre->response;
|
|
|
- UA_NotificationMessage *message = &response->notificationMessage;
|
|
|
-
|
|
|
/* <-- The point of no return --> */
|
|
|
|
|
|
- /* Remove the response from the response queue */
|
|
|
- UA_Session_removePublishReq(session, pre);
|
|
|
+ UA_PublishResponse *response = &pre->response;
|
|
|
+ UA_NotificationMessage *message = &response->notificationMessage;
|
|
|
|
|
|
/* Set up the response. Note that this response has no related subscription id */
|
|
|
response->responseHeader.timestamp = UA_DateTime_now();
|
|
@@ -459,12 +404,10 @@ UA_Subscription_reachedPublishReqLimit(UA_Server *server, UA_Session *session)
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, session,
|
|
|
"Sending out a publish response triggered by too many publish requests");
|
|
|
UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId,
|
|
|
- UA_MESSAGETYPE_MSG, response,
|
|
|
- &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
+ UA_MESSAGETYPE_MSG, response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
|
|
/* Free the response */
|
|
|
- UA_Array_delete(response->results, response->resultsSize,
|
|
|
- &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
+ UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
|
|
|
|
|
|
return true;
|
|
@@ -480,10 +423,8 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
|
|
|
UA_StatusCode retval =
|
|
|
- UA_Server_addRepeatedCallback(server,
|
|
|
- (UA_ServerCallback)UA_Subscription_publishTriggeredCallback,
|
|
|
- sub, (UA_UInt32)sub->publishingInterval,
|
|
|
- &sub->publishCallbackId);
|
|
|
+ UA_Server_addRepeatedCallback(server, (UA_ServerCallback)publishCallback,
|
|
|
+ sub, (UA_UInt32)sub->publishingInterval, &sub->publishCallbackId);
|
|
|
if(retval != UA_STATUSCODE_GOOD)
|
|
|
return retval;
|
|
|
|
|
@@ -493,15 +434,13 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
|
|
|
UA_StatusCode
|
|
|
Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Unregister subscription "
|
|
|
- "publishing callback", sub->subscriptionId);
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
|
|
|
+ "Unregister subscription publishing callback", sub->subscriptionId);
|
|
|
|
|
|
if(!sub->publishCallbackIsRegistered)
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
|
|
|
- UA_StatusCode retval =
|
|
|
- UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
|
|
|
+ UA_StatusCode retval = UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
|
|
|
if(retval != UA_STATUSCODE_GOOD)
|
|
|
return retval;
|
|
|
|
|
@@ -512,22 +451,19 @@ Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub)
|
|
|
/* When the session has publish requests stored but the last subscription is
|
|
|
* deleted... Send out empty responses */
|
|
|
void
|
|
|
-UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
|
|
|
- UA_Session *session) {
|
|
|
+UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session) {
|
|
|
/* No session or there are remaining subscriptions */
|
|
|
if(!session || LIST_FIRST(&session->serverSubscriptions))
|
|
|
return;
|
|
|
|
|
|
/* Send a response for every queued request */
|
|
|
UA_PublishResponseEntry *pre;
|
|
|
- while((pre = UA_Session_getPublishReq(session))) {
|
|
|
- UA_Session_removePublishReq(session, pre);
|
|
|
+ while((pre = UA_Session_dequeuePublishReq(session))) {
|
|
|
UA_PublishResponse *response = &pre->response;
|
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
|
|
|
response->responseHeader.timestamp = UA_DateTime_now();
|
|
|
- UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId,
|
|
|
- UA_MESSAGETYPE_MSG, response,
|
|
|
- &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
+ UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId, UA_MESSAGETYPE_MSG,
|
|
|
+ response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
UA_PublishResponse_deleteMembers(response);
|
|
|
UA_free(pre);
|
|
|
}
|