소스 검색

Creating of timedWorker based subscriptions enabled. Change and Delete still pending.

ichrispa 9 년 전
부모
커밋
30570f5042
3개의 변경된 파일63개의 추가작업 그리고 16개의 파일을 삭제
  1. 14 9
      src/server/ua_services_subscription.c
  2. 44 6
      src/server/ua_subscription.c
  3. 5 1
      src/server/ua_subscription.h

+ 14 - 9
src/server/ua_services_subscription.c

@@ -44,7 +44,8 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
     newSubscription->Priority                = request->priority;
     
     UA_Guid jobId = SubscriptionManager_getUniqueGUID(&(session->subscriptionManager));
-    Subscriptions_createdUpdateJob(server, jobId, newSubscription);
+    Subscription_createdUpdateJob(server, jobId, newSubscription);
+    Subscription_registerUpdateJob(server, newSubscription);
     SubscriptionManager_addSubscription(&(session->subscriptionManager), newSubscription);    
 }
 
@@ -153,16 +154,20 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
     
     // See if any new data is available
     LIST_FOREACH(sub, &manager->ServerSubscriptions, listEntry) {
-        // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
-        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-            MonitoredItem_QueuePushDataValue(server, mon);
-	
-        // FIXME: We are forcing notification updates for the subscription. This
-        // should be done by a timed work item.
-        Subscription_updateNotifications(sub);
+        if (sub->timedUpdateIsRegistered == UA_FALSE) {
+            // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
+            // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
+                MonitoredItem_QueuePushDataValue(server, mon);
+            
+            // FIXME: We are forcing notification updates for the subscription. This
+            // should be done by a timed work item.
+            Subscription_updateNotifications(sub);
+        }
+        
         if(Subscription_queuedNotifications(sub) <= 0)
             continue;
-
+        
         response->subscriptionId = sub->SubscriptionID;
         Subscription_copyTopNotificationMessage(&(response->notificationMessage), sub);
         if (sub->unpublishedNotifications.lh_first->notification->sequenceNumber > sub->SequenceNumber) {

+ 44 - 6
src/server/ua_subscription.c

@@ -14,7 +14,8 @@ UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
     new->LastPublished  = 0;
     new->SequenceNumber = 0;
     memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
-    new->timedUpdateJob     = UA_NULL;
+    new->timedUpdateJob          = UA_NULL;
+    new->timedUpdateIsRegistered = UA_FALSE;
     LIST_INIT(&new->MonitoredItems);
     LIST_INIT(&new->unpublishedNotifications);
     return new;
@@ -254,14 +255,31 @@ UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscri
 
 static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
     // Timed-Worker/Job Version of updateNotifications
+    UA_Subscription *sub = (UA_Subscription *) data;
+    UA_MonitoredItem *mon;
+    
+    if (!data || !server)
+        return;
+    
+    // This is set by the Subscription_delete function to detere us from fiddling with
+    // this subscription if it is being deleted (not technically thread save, but better
+    // then nothing at all)
+    if (sub->SubscriptionID == 0)
+        return;
+    
+    // FIXME: This should be done by the event system
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
+        MonitoredItem_QueuePushDataValue(server, mon);
+       
+    Subscription_updateNotifications(sub);
     return;
 }
 
 
-UA_StatusCode Subscriptions_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
+UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
     if (server == UA_NULL || sub == UA_NULL)
         return UA_STATUSCODE_BADSERVERINDEXINVALID;
-    
+        
     UA_Job *theWork;
     theWork = (UA_Job *) malloc(sizeof(UA_Job));
     if (!theWork)
@@ -272,16 +290,36 @@ UA_StatusCode Subscriptions_createdUpdateJob(UA_Server *server, UA_Guid jobId, U
     *                     .job.methodCall = { .method = Subscription_timedUpdateNotificationsJob, .data = sub } };
     */
    *theWork = (UA_Job) {  .type = UA_JOBTYPE_METHODCALL,
-                          .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = NULL} };
+                          . job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = sub} };
    
    sub->timedUpdateJobGuid = jobId;
    sub->timedUpdateJob     = theWork;
    
-   //UA_Server_addRepeatedJob(server, );
-   
    return UA_STATUSCODE_GOOD;
 }
 
+UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
+    UA_Int32 retval = UA_STATUSCODE_GOOD;
+    if (server == UA_NULL || sub == UA_NULL)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+    
+    if (sub->PublishingInterval <= 5 ) 
+        return UA_STATUSCODE_BADNOTSUPPORTED;
+    
+    retval |= UA_Server_addRepeatedJob(server, (UA_Job) *(sub->timedUpdateJob), (UA_UInt32) (sub->PublishingInterval), &(sub->timedUpdateJobGuid));
+    printf("Attempt reg...\n");
+    if (!retval) {
+        sub->timedUpdateIsRegistered = UA_TRUE;
+        printf("done with %u\n", (UA_UInt32) (sub->PublishingInterval));
+    }
+    return retval;
+}
+
+UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
+    return 0;
+}
+
+
 /*****************/
 /* MonitoredItem */
 /*****************/

+ 5 - 1
src/server/ua_subscription.h

@@ -85,6 +85,7 @@ typedef struct UA_Subscription_s {
     UA_UInt32                           SequenceNumber;
     UA_Guid                             timedUpdateJobGuid;
     UA_Job                              *timedUpdateJob;
+    UA_Boolean                          timedUpdateIsRegistered;
     LIST_ENTRY(UA_Subscription_s)       listEntry;
     LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification_s) unpublishedNotifications;
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem_s) MonitoredItems;
@@ -98,7 +99,10 @@ UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
 void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
 void Subscription_generateKeepAlive(UA_Subscription *subscription);
-UA_StatusCode Subscriptions_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
+UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
+UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub);
+
 static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data);
 
 #endif /* UA_SUBSCRIPTION_H_ */