Ver código fonte

Merge pull request #1091 from jpfr/cosmetic_subscriptions

cosemtic improvements to the subscriptions module
Julius Pfrommer 7 anos atrás
pai
commit
b1730e5993
2 arquivos alterados com 86 adições e 64 exclusões
  1. 78 60
      src/server/ua_subscription.c
  2. 8 4
      src/server/ua_subscription.h

+ 78 - 60
src/server/ua_subscription.c

@@ -16,28 +16,27 @@
 /* MonitoredItem */
 /*****************/
 
-UA_MonitoredItem * UA_MonitoredItem_new() {
-    UA_MonitoredItem *newItem = (UA_MonitoredItem *)UA_malloc(sizeof(UA_MonitoredItem));
+UA_MonitoredItem *
+UA_MonitoredItem_new(void) {
+    /* Allocate the memory */
+    UA_MonitoredItem *newItem =
+        (UA_MonitoredItem*)UA_calloc(1, sizeof(UA_MonitoredItem));
     if(!newItem)
         return NULL;
-    newItem->subscription = NULL;
-    newItem->currentQueueSize = 0;
-    newItem->maxQueueSize = 0;
+
+    /* Remaining members are covered by calloc zeroing out the memory */
     newItem->monitoredItemType = UA_MONITOREDITEMTYPE_CHANGENOTIFY; /* currently hardcoded */
     newItem->timestampsToReturn = UA_TIMESTAMPSTORETURN_SOURCE;
-    UA_String_init(&newItem->indexRange);
     TAILQ_INIT(&newItem->queue);
-    UA_NodeId_init(&newItem->monitoredNodeId);
-    newItem->lastSampledValue = UA_BYTESTRING_NULL;
-    memset(&newItem->sampleJobGuid, 0, sizeof(UA_Guid));
-    newItem->sampleJobIsRegistered = false;
-    newItem->itemId = 0;
     return newItem;
 }
 
-void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+void
+MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    /* Remove the sampling job */
     MonitoredItem_unregisterSampleJob(server, monitoredItem);
-    /* clear the queued samples */
+
+    /* Clear the queued samples */
     MonitoredItem_queuedValue *val, *val_tmp;
     TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
         TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
@@ -45,24 +44,30 @@ void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
         UA_free(val);
     }
     monitoredItem->currentQueueSize = 0;
+
+    /* Remove the monitored item */
     LIST_REMOVE(monitoredItem, listEntry);
     UA_String_deleteMembers(&monitoredItem->indexRange);
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
-    UA_free(monitoredItem);
+    UA_free(monitoredItem); // TODO: Use a delayed free
 }
 
 static void
 ensureSpaceInMonitoredItemQueue(UA_MonitoredItem *mon) {
+    /* Enough space, nothing to do here */
     if(mon->currentQueueSize < mon->maxQueueSize)
         return;
+
+    /* Get the item to remove */
     MonitoredItem_queuedValue *queueItem;
     if(mon->discardOldest)
         queueItem = TAILQ_FIRST(&mon->queue);
     else
-        queueItem = TAILQ_LAST(&mon->queue,
-                               memberstruct(UA_MonitoredItem,QueueOfQueueDataValues));
-    UA_assert(queueItem); /* When the currentQueueSize > 0, then there is an item */
+        queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+    UA_assert(queueItem);
+
+    /* Remove the item */
     TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
     UA_DataValue_deleteMembers(&queueItem->value);
     UA_free(queueItem);
@@ -133,7 +138,8 @@ detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value, UA_ByteString *enc
 static UA_Boolean
 sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
                         UA_MonitoredItem *monitoredItem,
-                        UA_DataValue *value, UA_ByteString *valueEncoding) {
+                        UA_DataValue *value,
+                        UA_ByteString *valueEncoding) {
     /* Store the pointer to the stack-allocated bytestring to see if a heap-allocation
      * was necessary */
     UA_Byte *stackValueEncoding = valueEncoding->data;
@@ -201,7 +207,9 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     return true;;
 }
 
-void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+void
+UA_MoniteredItem_SampleCallback(UA_Server *server,
+                                UA_MonitoredItem *monitoredItem) {
     UA_Subscription *sub = monitoredItem->subscription;
     if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
@@ -256,7 +264,8 @@ MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
     return retval;
 }
 
-UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+UA_StatusCode
+MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
     if(!mon->sampleJobIsRegistered)
         return UA_STATUSCODE_GOOD;
     mon->sampleJobIsRegistered = false;
@@ -267,28 +276,25 @@ UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredI
 /* Subscription */
 /****************/
 
-UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
-    UA_Subscription *newItem = (UA_Subscription *)UA_malloc(sizeof(UA_Subscription));
+UA_Subscription *
+UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
+    /* Allocate the memory */
+    UA_Subscription *newItem =
+        (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription));
     if(!newItem)
         return NULL;
+
+    /* Remaining members are covered by calloc zeroing out the memory */
     newItem->session = session;
     newItem->subscriptionID = subscriptionID;
-    newItem->sequenceNumber = 0;
-    newItem->maxKeepAliveCount = 0;
-    newItem->publishingEnabled = false;
-    memset(&newItem->publishJobGuid, 0, sizeof(UA_Guid));
-    newItem->publishJobIsRegistered = false;
-    newItem->currentKeepAliveCount = 0;
-    newItem->currentLifetimeCount = 0;
-    newItem->lastMonitoredItemId = 0;
     newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
-    LIST_INIT(&newItem->monitoredItems);
     TAILQ_INIT(&newItem->retransmissionQueue);
-    newItem->retransmissionQueueSize = 0;
     return newItem;
 }
 
-void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
+void
+UA_Subscription_deleteMembers(UA_Subscription *subscription,
+                              UA_Server *server) {
     Subscription_unregisterPublishJob(server, subscription);
 
     /* Delete monitored Items */
@@ -309,7 +315,8 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
 }
 
 UA_MonitoredItem *
-UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID) {
+UA_Subscription_getMonitoredItem(UA_Subscription *sub,
+                                 UA_UInt32 monitoredItemID) {
     UA_MonitoredItem *mon;
     LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
         if(mon->itemId == monitoredItemID)
@@ -333,19 +340,21 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
 }
 
 static size_t
-countQueuedNotifications(UA_Subscription *sub, UA_Boolean *moreNotifications) {
+countQueuedNotifications(UA_Subscription *sub,
+                         UA_Boolean *moreNotifications) {
+    if(!sub->publishingEnabled)
+        return 0;
+
     size_t notifications = 0;
-    if(sub->publishingEnabled) {
-        UA_MonitoredItem *mon;
-        LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-            MonitoredItem_queuedValue *qv;
-            TAILQ_FOREACH(qv, &mon->queue, listEntry) {
-                if(notifications >= sub->notificationsPerPublish) {
-                    *moreNotifications = true;
-                    break;
-                }
-                ++notifications;
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
+        MonitoredItem_queuedValue *qv;
+        TAILQ_FOREACH(qv, &mon->queue, listEntry) {
+            if(notifications >= sub->notificationsPerPublish) {
+                *moreNotifications = true;
+                break;
             }
+            ++notifications;
         }
     }
     return notifications;
@@ -358,8 +367,7 @@ UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub
     if(server->config.maxRetransmissionQueueSize > 0 &&
        sub->retransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
         UA_NotificationMessageEntry *lastentry =
-            TAILQ_LAST(&sub->retransmissionQueue,
-                       memberstruct(UA_Subscription,UA_ListOfNotificationMessages));
+            TAILQ_LAST(&sub->retransmissionQueue, ListOfNotificationMessages);
         TAILQ_REMOVE(&sub->retransmissionQueue, lastentry, listEntry);
         --sub->retransmissionQueueSize;
         UA_NotificationMessage_deleteMembers(&lastentry->message);
@@ -372,7 +380,8 @@ UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub
 }
 
 UA_StatusCode
-UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
+UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
+                                            UA_UInt32 sequenceNumber) {
     UA_NotificationMessageEntry *entry, *entry_tmp;
     TAILQ_FOREACH_SAFE(entry, &sub->retransmissionQueue, listEntry, entry_tmp) {
         if(entry->message.sequenceNumber != sequenceNumber)
@@ -387,7 +396,8 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequ
 }
 
 static UA_StatusCode
-prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
+prepareNotificationMessage(UA_Subscription *sub,
+                           UA_NotificationMessage *message,
                            size_t notifications) {
     /* Array of ExtensionObject to hold different kinds of notifications
        (currently only DataChangeNotifications) */
@@ -436,9 +446,11 @@ prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message
     return UA_STATUSCODE_GOOD;
 }
 
-void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                         "Publish Callback", sub->subscriptionID);
+void
+UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                         "Subscription %u | Publish Callback",
+                         sub->subscriptionID);
 
     /* Count the available notifications */
     UA_Boolean moreNotifications = false;
@@ -472,8 +484,9 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         } else {
             ++sub->currentLifetimeCount;
             if(sub->currentLifetimeCount > sub->lifeTimeCount) {
-                UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                                     "End of lifetime for subscription", sub->subscriptionID);
+                UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                                     "Subscription %u | End of lifetime for subscription",
+                                     sub->subscriptionID);
                 UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
             }
         }
@@ -485,13 +498,15 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_NotificationMessageEntry *retransmission = NULL;
     if(notifications > 0) {
         /* Allocate the retransmission entry */
-        retransmission = (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
+        retransmission =
+            (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
         if(!retransmission) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | Could not allocate memory "
                                    "for retransmission", sub->subscriptionID);
             return;
         }
+
         /* Prepare the response */
         UA_StatusCode retval =
             prepareNotificationMessage(sub, message, notifications);
@@ -531,7 +546,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* Get the available sequence numbers from the retransmission queue */
     size_t available = sub->retransmissionQueueSize;
     if(available > 0) {
-        response->availableSequenceNumbers = (UA_UInt32 *)UA_alloca(available * sizeof(UA_UInt32));
+        response->availableSequenceNumbers =
+            (UA_UInt32*)UA_alloca(available * sizeof(UA_UInt32));
         response->availableSequenceNumbersSize = available;
         size_t i = 0;
         UA_NotificationMessageEntry *nme;
@@ -588,8 +604,8 @@ Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(!sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | Unregister subscription publishing callback",
-                         sub->subscriptionID);
+                         "Subscription %u | Unregister subscription "
+                         "publishing callback", sub->subscriptionID);
     sub->publishJobIsRegistered = false;
     return UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
 }
@@ -597,9 +613,11 @@ Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
 /* When the session has publish requests stored but the last subscription is
    deleted... Send out empty responses */
 void
-UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_NodeId *sessionToken) {
+UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
+                                                    UA_NodeId *sessionToken) {
     /* Get session */
-    UA_Session *session = UA_SessionManager_getSession(&server->sessionManager, sessionToken);
+    UA_Session *session =
+        UA_SessionManager_getSession(&server->sessionManager, sessionToken);
     UA_NodeId_delete(sessionToken);
 
     /* No session or there are remaining subscriptions */

+ 8 - 4
src/server/ua_subscription.h

@@ -27,6 +27,8 @@ typedef struct MonitoredItem_queuedValue {
     UA_DataValue value;
 } MonitoredItem_queuedValue;
 
+typedef TAILQ_HEAD(QueuedValueQueue, MonitoredItem_queuedValue) QueuedValueQueue;
+
 typedef struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
 
@@ -53,10 +55,10 @@ typedef struct UA_MonitoredItem {
 
     /* Sample Queue */
     UA_ByteString lastSampledValue;
-    TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
+    QueuedValueQueue queue;
 } UA_MonitoredItem;
 
-UA_MonitoredItem *UA_MonitoredItem_new(void);
+UA_MonitoredItem * UA_MonitoredItem_new(void);
 void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem);
 UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon);
@@ -80,6 +82,8 @@ typedef enum {
     UA_SUBSCRIPTIONSTATE_KEEPALIVE
 } UA_SubscriptionState;
 
+typedef TAILQ_HEAD(ListOfNotificationMessages, UA_NotificationMessageEntry) ListOfNotificationMessages;
+
 struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
 
@@ -108,11 +112,11 @@ struct UA_Subscription {
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
 
     /* Retransmission Queue */
-    TAILQ_HEAD(UA_ListOfNotificationMessages, UA_NotificationMessageEntry) retransmissionQueue;
+    ListOfNotificationMessages retransmissionQueue;
     UA_UInt32 retransmissionQueueSize;
 };
 
-UA_Subscription *UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
+UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);
 UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub);