|
@@ -29,14 +29,16 @@ UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server)
|
|
|
|
|
|
/* Delete monitored Items */
|
|
|
UA_MonitoredItem *mon, *tmp_mon;
|
|
|
- LIST_FOREACH_SAFE(mon, &subscription->monitoredItems, listEntry, tmp_mon) {
|
|
|
+ LIST_FOREACH_SAFE(mon, &subscription->monitoredItems,
|
|
|
+ listEntry, tmp_mon) {
|
|
|
LIST_REMOVE(mon, listEntry);
|
|
|
MonitoredItem_delete(server, mon);
|
|
|
}
|
|
|
|
|
|
/* Delete Retransmission Queue */
|
|
|
UA_NotificationMessageEntry *nme, *nme_tmp;
|
|
|
- TAILQ_FOREACH_SAFE(nme, &subscription->retransmissionQueue, listEntry, nme_tmp) {
|
|
|
+ TAILQ_FOREACH_SAFE(nme, &subscription->retransmissionQueue,
|
|
|
+ listEntry, nme_tmp) {
|
|
|
TAILQ_REMOVE(&subscription->retransmissionQueue, nme, listEntry);
|
|
|
UA_NotificationMessage_deleteMembers(&nme->message);
|
|
|
UA_free(nme);
|
|
@@ -56,8 +58,7 @@ UA_Subscription_getMonitoredItem(UA_Subscription *sub,
|
|
|
}
|
|
|
|
|
|
UA_StatusCode
|
|
|
-UA_Subscription_deleteMonitoredItem(UA_Server *server,
|
|
|
- UA_Subscription *sub,
|
|
|
+UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
|
|
|
UA_UInt32 monitoredItemID) {
|
|
|
/* Find the MonitoredItem */
|
|
|
UA_MonitoredItem *mon;
|
|
@@ -158,7 +159,8 @@ prepareNotificationMessage(UA_Subscription *sub,
|
|
|
|
|
|
/* Allocate array of notifications */
|
|
|
dcn->monitoredItems = (UA_MonitoredItemNotification *)
|
|
|
- UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
|
|
|
+ UA_Array_new(notifications,
|
|
|
+ &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
|
|
|
if(!dcn->monitoredItems) {
|
|
|
UA_NotificationMessage_deleteMembers(message);
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
@@ -216,17 +218,19 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
/* Cannot publish without a response */
|
|
|
if(!pre) {
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Cannot send a publish response "
|
|
|
- "since the publish queue is empty", sub->subscriptionID);
|
|
|
+ "Subscription %u | Cannot send a publish "
|
|
|
+ "response since the publish queue is empty",
|
|
|
+ sub->subscriptionID);
|
|
|
if(sub->state != UA_SUBSCRIPTIONSTATE_LATE) {
|
|
|
sub->state = UA_SUBSCRIPTIONSTATE_LATE;
|
|
|
} else {
|
|
|
++sub->currentLifetimeCount;
|
|
|
if(sub->currentLifetimeCount > sub->lifeTimeCount) {
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | End of lifetime for subscription",
|
|
|
- sub->subscriptionID);
|
|
|
- UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
|
|
|
+ "Subscription %u | End of lifetime "
|
|
|
+ "for subscription", sub->subscriptionID);
|
|
|
+ UA_Session_deleteSubscription(server, sub->session,
|
|
|
+ sub->subscriptionID);
|
|
|
}
|
|
|
}
|
|
|
return;
|
|
@@ -237,8 +241,8 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
UA_NotificationMessageEntry *retransmission = NULL;
|
|
|
if(notifications > 0) {
|
|
|
/* Allocate the retransmission entry */
|
|
|
- retransmission =
|
|
|
- (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
|
|
|
+ retransmission = (UA_NotificationMessageEntry*)
|
|
|
+ UA_malloc(sizeof(UA_NotificationMessageEntry));
|
|
|
if(!retransmission) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | Could not allocate memory "
|
|
@@ -275,9 +279,9 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
/* Increase the sequence number */
|
|
|
message->sequenceNumber = ++sub->sequenceNumber;
|
|
|
|
|
|
- /* Put the notification message into the retransmission queue. This needs to
|
|
|
- * be done here, so that the message itself is included in the available
|
|
|
- * sequence numbers for acknowledgement. */
|
|
|
+ /* Put the notification message into the retransmission queue. This
|
|
|
+ * needs to be done here, so that the message itself is included in the
|
|
|
+ * available sequence numbers for acknowledgement. */
|
|
|
retransmission->message = response->notificationMessage;
|
|
|
UA_Subscription_addRetransmissionMessage(server, sub, retransmission);
|
|
|
}
|
|
@@ -298,9 +302,11 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
|
|
|
/* Send the response */
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Sending out a publish response with %u "
|
|
|
- "notifications", sub->subscriptionID, (UA_UInt32)notifications);
|
|
|
- UA_SecureChannel_sendBinaryMessage(sub->session->channel, pre->requestId, response,
|
|
|
+ "Subscription %u | Sending out a publish response "
|
|
|
+ "with %u notifications", sub->subscriptionID,
|
|
|
+ (UA_UInt32)notifications);
|
|
|
+ UA_SecureChannel_sendBinaryMessage(sub->session->channel,
|
|
|
+ pre->requestId, response,
|
|
|
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
|
|
|
/* Reset subscription state to normal. */
|
|
@@ -320,32 +326,41 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
|
|
|
UA_StatusCode
|
|
|
Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
+ "Subscription %u | Register subscription "
|
|
|
+ "publishing callback", sub->subscriptionID);
|
|
|
+
|
|
|
if(sub->publishCallbackIsRegistered)
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | Register subscription publishing callback",
|
|
|
- sub->subscriptionID);
|
|
|
UA_StatusCode retval =
|
|
|
UA_Server_addRepeatedCallback(server,
|
|
|
- (UA_ServerCallback)UA_Subscription_publishCallback,
|
|
|
- sub, (UA_UInt32)sub->publishingInterval,
|
|
|
- &sub->publishCallbackId);
|
|
|
- if(retval == UA_STATUSCODE_GOOD)
|
|
|
- sub->publishCallbackIsRegistered = true;
|
|
|
- return retval;
|
|
|
+ (UA_ServerCallback)UA_Subscription_publishCallback,
|
|
|
+ sub, (UA_UInt32)sub->publishingInterval,
|
|
|
+ &sub->publishCallbackId);
|
|
|
+ if(retval != UA_STATUSCODE_GOOD)
|
|
|
+ return retval;
|
|
|
+
|
|
|
+ sub->publishCallbackIsRegistered = true;
|
|
|
+ return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
|
UA_StatusCode
|
|
|
Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
|
|
|
- if(!sub->publishCallbackIsRegistered)
|
|
|
- return UA_STATUSCODE_GOOD;
|
|
|
-
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | Unregister subscription "
|
|
|
"publishing callback", sub->subscriptionID);
|
|
|
+
|
|
|
+ if(!sub->publishCallbackIsRegistered)
|
|
|
+ return UA_STATUSCODE_GOOD;
|
|
|
+
|
|
|
+ UA_StatusCode retval =
|
|
|
+ UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
|
|
|
+ if(retval != UA_STATUSCODE_GOOD)
|
|
|
+ return retval;
|
|
|
+
|
|
|
sub->publishCallbackIsRegistered = false;
|
|
|
- return UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
|
|
|
+ return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
|
/* When the session has publish requests stored but the last subscription is
|
|
@@ -364,7 +379,8 @@ UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
|
|
|
UA_PublishResponse *response = &pre->response;
|
|
|
response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
|
|
|
response->responseHeader.timestamp = UA_DateTime_now();
|
|
|
- UA_SecureChannel_sendBinaryMessage(session->channel, pre->requestId, response,
|
|
|
+ UA_SecureChannel_sendBinaryMessage(session->channel,
|
|
|
+ pre->requestId, response,
|
|
|
&UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
|
|
|
UA_PublishResponse_deleteMembers(response);
|
|
|
UA_free(pre);
|