Browse Source

Example client now demonstates usage of dataChange event hook and subscription usage.

ichrispa 9 years ago
parent
commit
b3de134193
3 changed files with 168 additions and 34 deletions
  1. 19 15
      examples/client.c
  2. 14 8
      include/ua_client.h
  3. 135 11
      src/client/ua_client.c

+ 19 - 15
examples/client.c

@@ -12,7 +12,7 @@
 
 void handler_TheAnswerChanged(UA_UInt32 handle, UA_DataValue *value);
 void handler_TheAnswerChanged(UA_UInt32 handle, UA_DataValue *value) {
-    printf("Handler called");
+    printf("The Answer has changed!\n");
     return;
 }
 
@@ -56,7 +56,22 @@ int main(int argc, char *argv[]) {
     }
     UA_BrowseRequest_deleteMembers(&bReq);
     UA_BrowseResponse_deleteMembers(&bResp);
-
+    
+    // Create a subscription
+    UA_Int32 subId = UA_Client_newSubscription(client, 0);
+    if (subId)
+        printf("Create subscription succeeded, id %u\n", subId);
+    
+    // Monitor TheAnswer
+    UA_NodeId monitorThis;
+    monitorThis = UA_NODEID_STRING_ALLOC(1, "the.answer");
+    UA_UInt32 monId = UA_Client_monitorItemChanges(client, subId, monitorThis, UA_ATTRIBUTEID_VALUE, &handler_TheAnswerChanged );
+    if (monId)
+        printf("Monitoring 'the.answer', id %u\n", subId);
+    UA_NodeId_deleteMembers(&monitorThis);
+    UA_Client_doPublish(client);
+    UA_Client_doPublish(client);
+    
     UA_Int32 value = 0;
     // Read node's value
     printf("\nReading the value of node (1, \"the.answer\"):\n");
@@ -100,20 +115,9 @@ int main(int argc, char *argv[]) {
     UA_WriteRequest_deleteMembers(&wReq);
     UA_WriteResponse_deleteMembers(&wResp);
     
-    // Create a subscription
-    UA_Int32 subId = UA_Client_newSubscription(client);
-    if (subId)
-        printf("Create subscription succeeded, id %u\n", subId);
-    
-    // Monitor TheAnswer
-    UA_NodeId monitorThis;
-    monitorThis = UA_NODEID_STRING_ALLOC(1, "the.answer");
-    UA_UInt32 monId = UA_Client_monitorItemChanges(client, subId, monitorThis, UA_ATTRIBUTEID_VALUE, &handler_TheAnswerChanged );
-    if (monId)
-        printf("Monitoring 'the.answer', id %u\n", subId);
-    UA_NodeId_deleteMembers(&monitorThis);
-    UA_Client_unMonitorItemChanges(client, subId, monId);
+    UA_Client_doPublish(client);
     
+    // Delete our subscription (which also unmonitors all items)
     if(!UA_Client_removeSubscription(client, subId))
         printf("Subscription removed\n");
     

+ 14 - 8
include/ua_client.h

@@ -63,6 +63,10 @@ UA_DeleteReferencesResponse UA_EXPORT
 
 
 #ifdef ENABLE_SUBSCRIPTIONS
+typedef struct UA_Client_NotificationsAckNumber_s {
+    UA_SubscriptionAcknowledgement subAck;
+    LIST_ENTRY(UA_Client_NotificationsAckNumber_s) listEntry;
+} UA_Client_NotificationsAckNumber;
 
 typedef struct UA_Client_MonitoredItem_s {
     UA_UInt32          MonitoredItemId;
@@ -88,14 +92,16 @@ typedef struct UA_Client_Subscription_s {
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_Client_MonitoredItem_s) MonitoredItems;
 } UA_Client_Subscription;
 
-UA_CreateSubscriptionResponse   UA_Client_createSubscription(UA_Client *client, UA_CreateSubscriptionRequest *request);
-UA_DeleteSubscriptionsResponse  UA_Client_deleteSubscriptions(UA_Client *client, UA_DeleteSubscriptionsRequest *request);
-UA_CreateMonitoredItemsResponse UA_Client_createMonitoredItems(UA_Client *client, UA_CreateMonitoredItemsRequest *request);
-UA_DeleteMonitoredItemsResponse UA_Client_deleteMonitoredItems(UA_Client *client, UA_DeleteMonitoredItemsRequest *request);
+UA_CreateSubscriptionResponse   UA_EXPORT UA_Client_createSubscription(UA_Client *client, UA_CreateSubscriptionRequest *request);
+UA_ModifySubscriptionResponse   UA_EXPORT UA_Client_modifySubscription(UA_Client *client, UA_ModifySubscriptionRequest *request);
+UA_DeleteSubscriptionsResponse  UA_EXPORT UA_Client_deleteSubscriptions(UA_Client *client, UA_DeleteSubscriptionsRequest *request);
+UA_CreateMonitoredItemsResponse UA_EXPORT UA_Client_createMonitoredItems(UA_Client *client, UA_CreateMonitoredItemsRequest *request);
+UA_DeleteMonitoredItemsResponse UA_EXPORT UA_Client_deleteMonitoredItems(UA_Client *client, UA_DeleteMonitoredItemsRequest *request);
+UA_PublishResponse              UA_EXPORT UA_Client_publish(UA_Client *client, UA_PublishRequest *request);
 
-UA_Int32      UA_EXPORT UA_Client_newSubscription(UA_Client *client);
+UA_Int32      UA_EXPORT UA_Client_newSubscription(UA_Client *client, UA_Int32 publishInterval);
 UA_StatusCode UA_EXPORT UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscriptionId);
-void          UA_EXPORT UA_Client_modifySubscription(UA_Client *client);
+//void UA_EXPORT UA_Client_modifySubscription(UA_Client *client);
 
 UA_UInt32     UA_EXPORT UA_Client_monitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId, 
                                                      UA_NodeId nodeId, UA_UInt32 attributeID, 
@@ -103,8 +109,8 @@ UA_UInt32     UA_EXPORT UA_Client_monitorItemChanges(UA_Client *client, UA_UInt3
 UA_StatusCode UA_EXPORT UA_Client_unMonitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId, 
                                                        UA_UInt32 monitoredItemId );
 
-void UA_EXPORT UA_Client_publish(UA_Client *client);
-
+void UA_EXPORT UA_Client_doPublish(UA_Client *client);
+UA_Boolean UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response);
 
 #endif
 #endif /* UA_CLIENT_H_ */

+ 135 - 11
src/client/ua_client.c

@@ -19,6 +19,7 @@ struct UA_Client {
     
 #ifdef ENABLE_SUBSCRIPTIONS
     UA_Int32 monitoredItemHandles;
+    LIST_HEAD(UA_ListOfUnacknowledgedNotificationNumbers, UA_Client_NotificationsAckNumber_s) pendingNotificationsAcks;
     LIST_HEAD(UA_ListOfClientSubscriptionItems, UA_Client_Subscription_s) subscriptions;
 #endif
     
@@ -52,6 +53,7 @@ UA_Client * UA_Client_new(UA_ClientConfig config, UA_Logger logger) {
 
 #ifdef ENABLE_SUBSCRIPTIONS
     client->monitoredItemHandles = 0;
+    LIST_INIT(&client->pendingNotificationsAcks);
     LIST_INIT(&client->subscriptions);
 #endif
     return client;
@@ -578,6 +580,13 @@ UA_DeleteSubscriptionsResponse UA_Client_deleteSubscriptions(UA_Client *client,
     return response;
 }
 
+UA_ModifySubscriptionResponse UA_Client_modifySubscription(UA_Client *client, UA_ModifySubscriptionRequest *request) {
+    UA_ModifySubscriptionResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
+    return response;
+}
+
 UA_CreateMonitoredItemsResponse UA_Client_createMonitoredItems(UA_Client *client, UA_CreateMonitoredItemsRequest *request) {
     UA_CreateMonitoredItemsResponse response;
     synchronousRequest(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
@@ -592,7 +601,14 @@ UA_DeleteMonitoredItemsResponse UA_Client_deleteMonitoredItems(UA_Client *client
     return response;
 }
 
-UA_Int32 UA_Client_newSubscription(UA_Client *client) {
+UA_PublishResponse UA_Client_publish(UA_Client *client, UA_PublishRequest *request) {
+    UA_PublishResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+    return response;
+}
+
+UA_Int32 UA_Client_newSubscription(UA_Client *client, UA_Int32 publishInterval) {
     UA_Int32 retval;
     UA_CreateSubscriptionRequest aReq;
     UA_CreateSubscriptionResponse aRes;
@@ -604,7 +620,7 @@ UA_Int32 UA_Client_newSubscription(UA_Client *client) {
     aReq.publishingEnabled = UA_TRUE;
     aReq.requestedLifetimeCount = 100;
     aReq.requestedMaxKeepAliveCount = 10;
-    aReq.requestedPublishingInterval = 100;
+    aReq.requestedPublishingInterval = publishInterval;
     
     aRes = UA_Client_createSubscription(client, &aReq);
     
@@ -632,7 +648,7 @@ UA_Int32 UA_Client_newSubscription(UA_Client *client) {
 
 UA_StatusCode UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscriptionId) {
     UA_Client_Subscription *sub;
-    UA_StatusCode retval;
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
     
     LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
         if (sub->SubscriptionID == subscriptionId)
@@ -653,6 +669,13 @@ UA_StatusCode UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscrip
     request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
     *(request.subscriptionIds) = sub->SubscriptionID;
     
+    UA_Client_MonitoredItem *mon;
+    LIST_FOREACH(mon, &(sub->MonitoredItems), listEntry) {
+        retval |= UA_Client_unMonitorItemChanges(client, sub->SubscriptionID, mon->MonitoredItemId);
+    }
+    if (retval != UA_STATUSCODE_GOOD)
+        return retval;
+    
     response = UA_Client_deleteSubscriptions(client, &request);
     
     if (response.resultsSize > 0)
@@ -662,8 +685,6 @@ UA_StatusCode UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscrip
     
     if (retval == UA_STATUSCODE_GOOD) {
         LIST_REMOVE(sub, listEntry);
-        // FIXME: On the serverside, monitoredItems are deleted along with the
-        // subscription... on the clientside not yet.
         UA_free(sub);
     }
     UA_DeleteSubscriptionsRequest_deleteMembers(&request);
@@ -782,16 +803,119 @@ UA_StatusCode UA_Client_unMonitorItemChanges(UA_Client *client, UA_UInt32 subscr
     return retval;
 }
 
-void UA_Client_modifySubscription(UA_Client *client) {}
+UA_Boolean UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
+    UA_Client_Subscription *sub;
+    UA_Client_MonitoredItem *mon;
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    
+    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
+        return UA_FALSE;
+    
+    // Check if the server has acknowledged any of our ACKS
+    // Note that a list of serverside status codes may be send without valid publish data, i.e. 
+    // during keepalives or no data availability
+    UA_Client_NotificationsAckNumber *tmpAck = client->pendingNotificationsAcks.lh_first;
+    UA_Client_NotificationsAckNumber *nxtAck = tmpAck;
+    for(int i=0; i<response.resultsSize && nxtAck != NULL; i++) {
+        tmpAck = nxtAck;
+        nxtAck = tmpAck->listEntry.le_next;
+        if (response.results[i] == UA_STATUSCODE_GOOD) {
+            LIST_REMOVE(tmpAck, listEntry);
+            UA_free(tmpAck);
+        }
+    }
+    
+    if(response.subscriptionId == 0)
+        return UA_FALSE;
+    
+    LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
+        if (sub->SubscriptionID == response.subscriptionId)
+            break;
+    }
+    if (sub == NULL)
+        return UA_FALSE;
+    
+    UA_NotificationMessage msg = response.notificationMessage;
+    UA_DataChangeNotification dataChangeNotification;
+    size_t decodingOffset = 0;
+    for (int k=0; k<msg.notificationDataSize; k++) {
+        if (msg.notificationData[k].encoding == UA_EXTENSIONOBJECT_ENCODINGMASK_BODYISBYTESTRING) {
+            if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 811 ) {
+                // This is a dataChangeNotification
+                retval |= UA_DataChangeNotification_decodeBinary(&(msg.notificationData[k].body), &decodingOffset, &dataChangeNotification);
+                UA_MonitoredItemNotification *mitemNot;
+                for(int i=0; i<dataChangeNotification.monitoredItemsSize; i++) {
+                    mitemNot = &dataChangeNotification.monitoredItems[i];
+                    // find this client handle
+                    LIST_FOREACH(mon, &(sub->MonitoredItems), listEntry) {
+                        if (mon->ClientHandle == mitemNot->clientHandle) {
+                            mon->handler(mitemNot->clientHandle, &(mitemNot->value));
+                            break;
+                        }
+                    }
+                }
+            }
+            else if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 820 ) {
+                //FIXME: This is a statusChangeNotification (not supported yet)
+                continue;
+            }
+            else if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 916 ) {
+                //FIXME: This is an EventNotification
+                continue;
+            }
+        }
+    }
+    
+    // We processed this message, add it to the list of pending acks (but make sure it's not in the list first)
+    LIST_FOREACH(tmpAck, &(client->pendingNotificationsAcks), listEntry) {
+        if (tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
+            tmpAck->subAck.subscriptionId == response.subscriptionId)
+            break;
+    }
+    if (tmpAck == NULL ){
+        tmpAck = (UA_Client_NotificationsAckNumber *) malloc(sizeof(UA_Client_NotificationsAckNumber));
+        tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
+        tmpAck->subAck.subscriptionId = sub->SubscriptionID;
+        LIST_INSERT_HEAD(&(client->pendingNotificationsAcks), tmpAck, listEntry);
+    }
+    
+    return response.moreNotifications;
+}
 
-void UA_Client_publish(UA_Client *client) {
+void UA_Client_doPublish(UA_Client *client) {
     UA_PublishRequest request;
     UA_PublishResponse response;
-    UA_PublishRequest_init(&request);
-    UA_PublishResponse_init(&response);
+    UA_Client_NotificationsAckNumber *ack;
+    UA_Boolean moreNotifications = UA_TRUE;
+    int index = 0 ;
     
-    UA_PublishRequest_deleteMembers(&request);
-    UA_PublishResponse_deleteMembers(&response);
+    do {
+        UA_PublishRequest_init(&request);
+        UA_PublishResponse_init(&response);
+        
+        request.subscriptionAcknowledgementsSize = 0;
+        LIST_FOREACH(ack, &(client->pendingNotificationsAcks), listEntry) {
+            request.subscriptionAcknowledgementsSize++;
+        }
+        request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement *) UA_malloc(sizeof(UA_SubscriptionAcknowledgement)*request.subscriptionAcknowledgementsSize);
+        
+        index = 0;
+        LIST_FOREACH(ack, &(client->pendingNotificationsAcks), listEntry) {
+            ack = client->pendingNotificationsAcks.lh_first;
+            request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
+            request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
+            index++;
+        }
+        
+        response = UA_Client_publish(client, &request);
+        if (response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
+            moreNotifications = UA_Client_processPublishRx(client, response);
+        else
+            moreNotifications = UA_FALSE;
+        
+        UA_PublishResponse_deleteMembers(&response);
+        UA_PublishRequest_deleteMembers(&request);
+    }  while(moreNotifications == UA_TRUE);
     return;
 }