Julius Pfrommer 9 роки тому
батько
коміт
f65c15b0c2
1 змінених файлів з 55 додано та 62 видалено
  1. 55 62
      src/server/ua_subscription.c

+ 55 - 62
src/server/ua_subscription.c

@@ -44,7 +44,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
     }
     
     // Unhook/Unregister any timed work assiociated with this subscription
-    if (subscription->timedUpdateJob != UA_NULL){
+    if(subscription->timedUpdateJob != UA_NULL){
         Subscription_unregisterUpdateJob(server, subscription);
         UA_free(subscription->timedUpdateJob);
     }
@@ -53,7 +53,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
 }
 
 UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
-    if (!subscription)
+    if(!subscription)
         return 0;
 
     UA_UInt32 j = 0;
@@ -103,19 +103,19 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
             monItemsChangeT+=mon->QueueSize.currentValue;
 	    else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
             monItemsStatusT+=mon->QueueSize.currentValue;
-	    else if ((mon->MonitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
+	    else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
             monItemsEventT+=mon->QueueSize.currentValue;
     }
     
     // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
-    if (Subscription_queuedNotifications(subscription) >= 10) {
+    if(Subscription_queuedNotifications(subscription) >= 10) {
         // Remove last entry
         LIST_FOREACH(msg, &subscription->unpublishedNotifications, listEntry)
             LIST_REMOVE(msg, listEntry);
         UA_free(msg);
     }
     
-    if (monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
+    if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
         // Decrement KeepAlive
         subscription->KeepAliveCount.currentValue--;
         // +- Generate KeepAlive msg if counter overruns
@@ -133,8 +133,8 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     // list of Queued values from all monitoredItems of that type
     msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);
     // + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
-    msg->notification->notificationData = UA_malloc(sizeof(UA_ExtensionObject) *
-                                                    msg->notification->notificationDataSize);
+    msg->notification->notificationData = UA_Array_new(&UA_TYPES[UA_TYPES_EXTENSIONOBJECT], 
+                                                       msg->notification->notificationDataSize);
     
     for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
         // Set the notification message type and encoding for each of 
@@ -147,7 +147,8 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
             changeNotification = UA_malloc(sizeof(UA_DataChangeNotification));
 	
             // Create one DataChangeNotification for each queue item held in each monitoredItems queue:
-            changeNotification->monitoredItems = UA_Array_new(&UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION], monItemsChangeT);
+            changeNotification->monitoredItems = UA_Array_new(&UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION],
+                                                              monItemsChangeT);
 	
             // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
             // the propper NotificationMessageType (Status, Change, Event)
@@ -156,7 +157,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
                 if(mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
                     continue;
                 // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
-                monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&(changeNotification->monitoredItems[monItemsChangeT]), mon);
+                monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
                 MonitoredItem_ClearQueue(mon);
             }
 
@@ -167,11 +168,11 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
             msg->notification->notificationData[notmsgn].body.length =
                 UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
             msg->notification->notificationData[notmsgn].body.data   =
-                UA_calloc((msg->notification->notificationData[notmsgn]).body.length, sizeof(void*));
+                UA_calloc(msg->notification->notificationData[notmsgn].body.length, sizeof(UA_Byte));
         
             notificationOffset = 0;
             UA_encodeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION],
-                            &(msg->notification->notificationData[notmsgn].body), &notificationOffset);
+                            &msg->notification->notificationData[notmsgn].body, &notificationOffset);
 	
             // FIXME: Not properly freed!
             for(unsigned int i=0; i<monItemsChangeT; i++) {
@@ -180,9 +181,9 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
             }
             UA_free(changeNotification->monitoredItems);
             UA_free(changeNotification);
-        } else if (notmsgn == 1) {
+        } else if(notmsgn == 1) {
             // FIXME: Constructing a StatusChangeNotification is not implemented
-        } else if (notmsgn == 2) {
+        } else if(notmsgn == 2) {
             // FIXME: Constructing a EventListNotification is not implemented
         }
     }
@@ -209,7 +210,7 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
     if(!dst)
         return;
     
-    if (Subscription_queuedNotifications(sub) == 0) {
+    if(Subscription_queuedNotifications(sub) == 0) {
       dst->notificationDataSize = 0;
       dst->publishTime = UA_DateTime_now();
       dst->sequenceNumber = 0;
@@ -221,13 +222,13 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
     dst->publishTime = latest->publishTime;
     dst->sequenceNumber = latest->sequenceNumber;
     
-    if (latest->notificationDataSize == 0) return;
+    if(latest->notificationDataSize == 0) return;
     
     dst->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject));
     dst->notificationData->encoding = latest->notificationData->encoding;
     dst->notificationData->typeId   = latest->notificationData->typeId;
-    UA_ByteString_copy((UA_String *) &(latest->notificationData->body),
-                       (UA_String *) &(dst->notificationData->body));
+    UA_ByteString_copy(&latest->notificationData->body,
+                       &dst->notificationData->body);
 }
 
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
@@ -240,8 +241,8 @@ UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscri
             continue;
         LIST_REMOVE(not, listEntry);
         if(not->notification != NULL) {
-            if (not->notification->notificationData != NULL) {
-                if (not->notification->notificationData->body.data != NULL)
+            if(not->notification->notificationData != NULL) {
+                if(not->notification->notificationData->body.data != NULL)
                     UA_free(not->notification->notificationData->body.data);
                 UA_free(not->notification->notificationData);
             }
@@ -260,13 +261,13 @@ static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *da
     UA_Subscription *sub = (UA_Subscription *) data;
     UA_MonitoredItem *mon;
     
-    if (!data || !server)
+    if(!data || !server)
         return;
     
     // This is set by the Subscription_delete function to detere us from fiddling with
     // this subscription if it is being deleted (not technically thread save, but better
     // then nothing at all)
-    if (sub->SubscriptionID == 0)
+    if(sub->SubscriptionID == 0)
         return;
     
     // FIXME: This should be done by the event system
@@ -279,20 +280,16 @@ static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *da
 
 
 UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
-    if (server == UA_NULL || sub == UA_NULL)
+    if(server == UA_NULL || sub == UA_NULL)
         return UA_STATUSCODE_BADSERVERINDEXINVALID;
         
     UA_Job *theWork;
     theWork = (UA_Job *) malloc(sizeof(UA_Job));
-    if (!theWork)
+    if(!theWork)
         return UA_STATUSCODE_BADOUTOFMEMORY;
     
-   /*
-    *   UA_Job theWork = {.type = UA_JOBTYPE_METHODCALL, 
-    *                     .job.methodCall = { .method = Subscription_timedUpdateNotificationsJob, .data = sub } };
-    */
-   *theWork = (UA_Job) {  .type = UA_JOBTYPE_METHODCALL,
-                          .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = sub} };
+   *theWork = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
+                        .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = sub} };
    
    sub->timedUpdateJobGuid = jobId;
    sub->timedUpdateJob     = theWork;
@@ -302,27 +299,28 @@ UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA
 
 UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
     UA_Int32 retval = UA_STATUSCODE_GOOD;
-    if (server == UA_NULL || sub == UA_NULL)
+    if(server == UA_NULL || sub == UA_NULL)
         return UA_STATUSCODE_BADSERVERINDEXINVALID;
     
-    if (sub->PublishingInterval <= 5 ) 
+    if(sub->PublishingInterval <= 5 ) 
         return UA_STATUSCODE_BADNOTSUPPORTED;
     
     // Practically enough, the client sends a uint32 in ms, which we store as datetime, which here is required in as uint32 in ms as the interval
 #ifdef _MSC_VER
-    retval |= UA_Server_addRepeatedJob(server, *(sub->timedUpdateJob), (UA_UInt32) (sub->PublishingInterval), &(sub->timedUpdateJobGuid));
+    retval |= UA_Server_addRepeatedJob(server, *(sub->timedUpdateJob), sub->PublishingInterval,
+                                       &sub->timedUpdateJobGuid);
 #else
-    retval |= UA_Server_addRepeatedJob(server, (UA_Job)*(sub->timedUpdateJob), (UA_UInt32)(sub->PublishingInterval), &(sub->timedUpdateJobGuid));
+    retval |= UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->PublishingInterval,
+                                       &sub->timedUpdateJobGuid);
 #endif
-    if (!retval) {
+    if(!retval)
         sub->timedUpdateIsRegistered = UA_TRUE;
-    }
     return retval;
 }
 
 UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
     UA_Int32 retval = UA_STATUSCODE_GOOD;
-    if (server == UA_NULL || sub == UA_NULL)
+    if(server == UA_NULL || sub == UA_NULL)
         return UA_STATUSCODE_BADSERVERINDEXINVALID;
     
     retval |= UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
@@ -444,18 +442,17 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, cons
     case UA_ATTRIBUTEID_EVENTNOTIFIER:
         break;
     case UA_ATTRIBUTEID_VALUE: 
-        if (src->nodeClass == UA_NODECLASS_VARIABLE) {
-            if ( srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
-                UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) src)->value, dst);
+        if(src->nodeClass == UA_NODECLASS_VARIABLE) {
+            const UA_VariableNode *vsrc = (const UA_VariableNode*)src;
+            if(srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
+                UA_Variant_copy(&vsrc->value.variant, dst);
                 samplingError = UA_FALSE;
-            }
-            else if (srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
+            } else if(srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
                 // todo: handle numeric ranges
-                if(srcAsVariableNode->value.dataSource.read(((const UA_VariableNode *) src)->value.dataSource.handle,
-                                                            (UA_Boolean) UA_TRUE, UA_NULL, &sourceDataValue)
-                   == UA_STATUSCODE_GOOD) {
-                    UA_Variant_copy( (const UA_Variant *) &(sourceDataValue.value), dst);
-                    if (sourceDataValue.value.data != NULL) {
+                if(srcAsVariableNode->value.dataSource.read(vsrc->value.dataSource.handle, UA_TRUE, UA_NULL,
+                                                            &sourceDataValue) == UA_STATUSCODE_GOOD) {
+                    UA_Variant_copy(&sourceDataValue.value, dst);
+                    if(sourceDataValue.value.data != NULL) {
                         UA_deleteMembers(sourceDataValue.value.data, sourceDataValue.value.type);
                         UA_free(sourceDataValue.value.data);
                         sourceDataValue.value.data = NULL;
@@ -502,7 +499,7 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
   
     // FIXME: Actively suppress non change value based monitoring. There should be
     // another function to handle status and events.
-    if (monitoredItem->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
+    if(monitoredItem->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
         return;
 
     newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
@@ -513,7 +510,6 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
     newvalue->listEntry.tqe_prev = UA_NULL;
     UA_Variant_init(&newvalue->value);
 
-  
     // Verify that the *Node being monitored is still valid
     // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
     // e.g. in multithreaded applications
@@ -533,16 +529,15 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
     }
   
     if(monitoredItem->QueueSize.currentValue >= monitoredItem->QueueSize.maxValue) {
-        if(monitoredItem->DiscardOldest == UA_TRUE) {
-            queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-            TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-            UA_free(queueItem);
-            monitoredItem->QueueSize.currentValue--;
-        } else {
+        if(monitoredItem->DiscardOldest != UA_TRUE) {
             // We cannot remove the oldest value and theres no queue space left. We're done here.
             UA_free(newvalue);
             return;
         }
+        queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_free(queueItem);
+        monitoredItem->QueueSize.currentValue--;
     }
   
     // encode the data to find if its different to the previous
@@ -556,18 +551,16 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
         monitoredItem->QueueSize.currentValue++;
         monitoredItem->LastSampled = UA_DateTime_now();
     } else {
-        if (UA_String_equal(&newValueAsByteString, &monitoredItem->LastSampledValue) == UA_TRUE) {
+        if(UA_String_equal(&newValueAsByteString, &monitoredItem->LastSampledValue) == UA_TRUE) {
             UA_Variant_deleteMembers(&newvalue->value);
             UA_free(newvalue);
             UA_String_deleteMembers(&newValueAsByteString);
             return;
-        } else {
-            UA_ByteString_deleteMembers(&monitoredItem->LastSampledValue);
-            monitoredItem->LastSampledValue = newValueAsByteString;
-            TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-            monitoredItem->QueueSize.currentValue++;
-            monitoredItem->LastSampled = UA_DateTime_now();
         }
+        UA_ByteString_deleteMembers(&monitoredItem->LastSampledValue);
+        monitoredItem->LastSampledValue = newValueAsByteString;
+        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+        monitoredItem->QueueSize.currentValue++;
+        monitoredItem->LastSampled = UA_DateTime_now();
     }
-    return;
 }