Browse Source

client side event support (#1161)

This patchset adds client side support for Events
Frank Meerkötter 6 years ago
parent
commit
ee781f50c2

+ 2 - 0
examples/CMakeLists.txt

@@ -42,6 +42,8 @@ endif()
 
 add_example(tutorial_client_firststeps tutorial_client_firststeps.c)
 
+add_example(tutorial_client_events tutorial_client_events.c)
+
 ##################
 # Example Server #
 ##################

+ 133 - 0
examples/tutorial_client_events.c

@@ -0,0 +1,133 @@
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
+
+#include "open62541.h"
+
+#include <signal.h>
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+static void
+handler_events(const UA_UInt32 monId, const size_t nEventFields, const UA_Variant *eventFields, void *context) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Notification");
+    for(size_t i = 0; i < nEventFields; ++i) {
+        if (UA_Variant_hasScalarType(&eventFields[i], &UA_TYPES[UA_TYPES_UINT16])) {
+            UA_UInt16 severity = *(UA_UInt16 *)eventFields[i].data;
+            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Severity: %u", severity);
+        } else if (UA_Variant_hasScalarType(&eventFields[i], &UA_TYPES[UA_TYPES_LOCALIZEDTEXT])) {
+            UA_LocalizedText *lt = (UA_LocalizedText *)eventFields[i].data;
+            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
+                        "Message: '%.*s'", (int)lt->text.length, lt->text.data);
+        }
+        else {
+            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
+                        "Don't know how to handle type: '%s'", eventFields[i].type->typeName);
+        }
+    }
+}
+
+static UA_Boolean running = true;
+static void stopHandler(int sig) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "received ctrl-c");
+    running = false;
+}
+
+const size_t nSelectClauses = 2;
+
+static UA_SimpleAttributeOperand *
+setupSelectClauses(void)
+{
+    UA_SimpleAttributeOperand *selectClauses = (UA_SimpleAttributeOperand *)UA_Array_new(nSelectClauses, &UA_TYPES[UA_TYPES_SIMPLEATTRIBUTEOPERAND]);
+    if(!selectClauses){
+        return NULL;
+    }
+
+    for(size_t i =0; i<nSelectClauses; ++i) {
+        UA_SimpleAttributeOperand_init(&selectClauses[i]);
+    }
+
+    selectClauses[0].typeDefinitionId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEEVENTTYPE);
+    selectClauses[0].browsePathSize = 1;
+    selectClauses[0].browsePath = (UA_QualifiedName*)UA_Array_new(selectClauses[0].browsePathSize, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
+    if(!selectClauses[0].browsePath) {
+        UA_SimpleAttributeOperand_delete(selectClauses);
+        return NULL;
+    }
+    selectClauses[0].attributeId = UA_ATTRIBUTEID_VALUE;
+    selectClauses[0].browsePath[0] = UA_QUALIFIEDNAME_ALLOC(0, "Message");
+
+    selectClauses[1].typeDefinitionId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEEVENTTYPE);
+    selectClauses[1].browsePathSize = 1;
+    selectClauses[1].browsePath = (UA_QualifiedName*)UA_Array_new(selectClauses[1].browsePathSize, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
+    if(!selectClauses[1].browsePath) {
+        UA_SimpleAttributeOperand_deleteMembers(selectClauses);
+        UA_SimpleAttributeOperand_delete(selectClauses);
+        return NULL;
+    }
+    selectClauses[1].attributeId = UA_ATTRIBUTEID_VALUE;
+    selectClauses[1].browsePath[0] = UA_QUALIFIEDNAME_ALLOC(0, "Severity");
+
+    return selectClauses;
+}
+
+#endif
+
+int main(int argc, char *argv[]) {
+    signal(SIGINT, stopHandler);
+    signal(SIGTERM, stopHandler);
+
+    UA_Client *client = UA_Client_new(UA_ClientConfig_default);
+
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://uademo.prosysopc.com:53530/OPCUA/SimulationServer");
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_Client_delete(client);
+        return (int)retval;
+    }
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+    /* Create a subscription */
+    UA_UInt32 subId = 0;
+    retval = UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_default, &subId);
+    if(!subId) {
+        UA_Client_disconnect(client);
+        UA_Client_delete(client);
+        return (int)retval;
+    }
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Create subscription succeeded, id %u", subId);
+
+    /* Add a MonitoredItem */
+    UA_NodeId monitorThis = UA_NODEID_NUMERIC(0, 2253); // Root->Objects->Server
+    UA_UInt32 monId = 0;
+
+    UA_SimpleAttributeOperand *selectClauses = setupSelectClauses();
+    if(!selectClauses){
+        UA_Client_Subscriptions_remove(client, subId);
+        UA_Client_disconnect(client);
+        UA_Client_delete(client);
+        return (int)UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+
+    UA_Client_Subscriptions_addMonitoredEvent(client, subId, monitorThis, UA_ATTRIBUTEID_EVENTNOTIFIER,
+                                              selectClauses, nSelectClauses,
+                                              NULL, 0,
+                                              &handler_events, NULL, &monId);
+    if (!monId) {
+        UA_Array_delete(selectClauses, nSelectClauses, &UA_TYPES[UA_TYPES_SIMPLEATTRIBUTEOPERAND]);
+        UA_Client_Subscriptions_remove(client, subId);
+        UA_Client_disconnect(client);
+        UA_Client_delete(client);
+        return (int)retval;
+    }
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Monitoring 'Root->Objects->Server', id %u", subId);
+
+    while (running)
+        UA_Client_Subscriptions_manuallySendPublishRequest(client);
+
+    /* Delete the subscription */
+    if(!UA_Client_Subscriptions_remove(client, subId))
+        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Subscription removed");
+#endif
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+    return (int) UA_STATUSCODE_GOOD;
+}

+ 15 - 0
include/ua_client_highlevel.h

@@ -587,6 +587,21 @@ UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId);
 UA_StatusCode UA_EXPORT
 UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client);
 
+typedef void (*UA_MonitoredEventHandlingFunction)(const UA_UInt32 monId,
+                                                  const size_t nEventFields,
+                                                  const UA_Variant *eventFields,
+                                                  void *context);
+
+UA_StatusCode UA_EXPORT
+UA_Client_Subscriptions_addMonitoredEvent(UA_Client *client, const UA_UInt32 subscriptionId,
+                                          const UA_NodeId nodeId, const UA_UInt32 attributeID,
+                                          UA_SimpleAttributeOperand *selectClause,
+                                          const size_t nSelectClauses,
+                                          UA_ContentFilterElement *whereClause,
+                                          const size_t nWhereClauses,
+                                          const UA_MonitoredEventHandlingFunction hf,
+                                          void *hfContext, UA_UInt32 *newMonitoredItemId);
+
 typedef void (*UA_MonitoredItemHandlingFunction)(UA_UInt32 monId,
                                                  UA_DataValue *value,
                                                  void *context);

+ 122 - 20
src/client/ua_client_highlevel_subscriptions.c

@@ -111,6 +111,93 @@ UA_Client_Subscriptions_forceDelete(UA_Client *client,
     UA_free(sub);
 }
 
+UA_StatusCode
+UA_Client_Subscriptions_addMonitoredEvent(UA_Client *client, const UA_UInt32 subscriptionId,
+                                         const UA_NodeId nodeId, const UA_UInt32 attributeID,
+                                         UA_SimpleAttributeOperand *selectClause,
+                                         const size_t nSelectClauses,
+                                         UA_ContentFilterElement *whereClause,
+                                         const size_t nWhereClauses,
+                                         const UA_MonitoredEventHandlingFunction hf,
+                                         void *hfContext, UA_UInt32 *newMonitoredItemId) {
+    UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
+    if(!sub)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+
+    /* Send the request */
+    UA_CreateMonitoredItemsRequest request;
+    UA_CreateMonitoredItemsRequest_init(&request);
+    request.subscriptionId = subscriptionId;
+
+    UA_MonitoredItemCreateRequest item;
+    UA_MonitoredItemCreateRequest_init(&item);
+    item.itemToMonitor.nodeId = nodeId;
+    item.itemToMonitor.attributeId = attributeID;
+    item.monitoringMode = UA_MONITORINGMODE_REPORTING;
+    item.requestedParameters.clientHandle = ++(client->monitoredItemHandles);
+    item.requestedParameters.samplingInterval = 0;
+    item.requestedParameters.discardOldest = false;
+
+    UA_EventFilter *evFilter = UA_EventFilter_new();
+    if(!evFilter) {
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+    UA_EventFilter_init(evFilter);
+    evFilter->selectClausesSize = nSelectClauses;
+    evFilter->selectClauses = selectClause;
+    evFilter->whereClause.elementsSize = nWhereClauses;
+    evFilter->whereClause.elements = whereClause;
+
+    item.requestedParameters.filter.encoding = UA_EXTENSIONOBJECT_DECODED_NODELETE;
+    item.requestedParameters.filter.content.decoded.type = &UA_TYPES[UA_TYPES_EVENTFILTER];
+    item.requestedParameters.filter.content.decoded.data = evFilter;
+
+    request.itemsToCreate = &item;
+    request.itemsToCreateSize = 1;
+    UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
+
+    // slight misuse of retval here to check if the deletion was successfull.
+    UA_StatusCode retval;
+    if(response.resultsSize == 0)
+        retval = response.responseHeader.serviceResult;
+    else
+        retval = response.results[0].statusCode;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+        UA_EventFilter_delete(evFilter);
+        return retval;
+    }
+
+    /* Create the handler */
+    UA_Client_MonitoredItem *newMon = (UA_Client_MonitoredItem *)UA_malloc(sizeof(UA_Client_MonitoredItem));
+    if(!newMon) {
+        UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+        UA_EventFilter_delete(evFilter);
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+
+    newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
+    UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId);
+    newMon->attributeID = attributeID;
+    newMon->clientHandle = client->monitoredItemHandles;
+    newMon->samplingInterval = 0;
+    newMon->queueSize = 0;
+    newMon->discardOldest = false;
+
+    newMon->handlerEvents = hf;
+    newMon->handlerEventsContext = hfContext;
+    newMon->monitoredItemId = response.results[0].monitoredItemId;
+    LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
+    *newMonitoredItemId = newMon->monitoredItemId;
+
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Created a monitored item with client handle %u", client->monitoredItemHandles);
+
+    UA_EventFilter_delete(evFilter);
+    UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+    return UA_STATUSCODE_GOOD;
+}
+
 UA_StatusCode
 UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
                                          UA_NodeId nodeId, UA_UInt32 attributeID,
@@ -229,10 +316,6 @@ UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
     if(!sub)
         return;
 
-    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                 "Processing a publish response on subscription %u with %u notifications",
-                 sub->subscriptionID, (unsigned int)response->notificationMessage.notificationDataSize);
-
     /* Check if the server has acknowledged any of the sent ACKs */
     for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
         /* remove also acks that are unknown to the server */
@@ -260,24 +343,43 @@ UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
         if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
             continue;
 
-        /* Currently only dataChangeNotifications are supported */
-        if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
-            continue;
-
-        UA_DataChangeNotification *dataChangeNotification = (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
-        for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
-            UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
-            UA_Client_MonitoredItem *mon;
-            LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-                if(mon->clientHandle == mitemNot->clientHandle) {
-                    mon->handler(mon->monitoredItemId, &mitemNot->value, mon->handlerContext);
-                    break;
+        if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
+            UA_DataChangeNotification *dataChangeNotification = (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
+            for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
+                UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
+                UA_Client_MonitoredItem *mon;
+                LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
+                    if(mon->clientHandle == mitemNot->clientHandle) {
+                        mon->handler(mon->monitoredItemId, &mitemNot->value, mon->handlerContext);
+                        break;
+                    }
                 }
+                if(!mon)
+                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                                 "Could not process a notification with clienthandle %u on subscription %u",
+                                 mitemNot->clientHandle, sub->subscriptionID);
             }
-            if(!mon)
-                UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                             "Could not process a notification with clienthandle %u on subscription %u",
-                             mitemNot->clientHandle, sub->subscriptionID);
+        }
+        else if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
+            UA_EventNotificationList *eventNotificationList = (UA_EventNotificationList *)msg->notificationData[k].content.decoded.data;
+            for (size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
+                UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
+                UA_Client_MonitoredItem *mon;
+                LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
+                    if(mon->clientHandle == eventFieldList->clientHandle) {
+                        mon->handlerEvents(mon->monitoredItemId, eventFieldList->eventFieldsSize,
+                                           eventFieldList->eventFields, mon->handlerContext);
+                        break;
+                    }
+                }
+                if(!mon)
+                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                                 "Could not process a notification with clienthandle %u on subscription %u",
+                                 eventFieldList->clientHandle, sub->subscriptionID);
+            }
+        }
+        else {
+            continue; // no other types are supported
         }
     }
 

+ 2 - 0
src/client/ua_client_internal.h

@@ -31,6 +31,8 @@ typedef struct UA_Client_MonitoredItem {
     UA_Boolean discardOldest;
     void (*handler)(UA_UInt32 monId, UA_DataValue *value, void *context);
     void *handlerContext;
+    void (*handlerEvents)(const UA_UInt32 monId, const size_t nEventFields, const UA_Variant *eventFields, void *context);
+    void *handlerEventsContext;
 } UA_Client_MonitoredItem;
 
 typedef struct UA_Client_Subscription {

+ 5 - 2
tests/check_types_memory.c

@@ -105,7 +105,8 @@ START_TEST(encodeShallYieldDecode) {
 END_TEST
 
 START_TEST(decodeShallFailWithTruncatedBufferButSurvive) {
-    if (_i == UA_TYPES_DISCOVERYCONFIGURATION)
+    //Skip test for void*
+    if (_i == UA_TYPES_DISCOVERYCONFIGURATION || _i == UA_TYPES_FILTEROPERAND || _i == UA_TYPES_MONITORINGFILTER)
         return;
     // given
     UA_ByteString msg1;
@@ -126,7 +127,7 @@ START_TEST(decodeShallFailWithTruncatedBufferButSurvive) {
     // when
     void *obj2 = UA_new(&UA_TYPES[_i]);
     size_t offset = 0;
-    retval = UA_decodeBinary(&msg1, &offset, obj2, &UA_TYPES[_i], 0, NULL); 
+    retval = UA_decodeBinary(&msg1, &offset, obj2, &UA_TYPES[_i], 0, NULL);
     ck_assert_int_ne(retval, UA_STATUSCODE_GOOD);
     UA_delete(obj2, &UA_TYPES[_i]);
     UA_ByteString_deleteMembers(&msg1);
@@ -207,6 +208,8 @@ START_TEST(calcSizeBinaryShallBeCorrect) {
     if(_i == UA_TYPES_VARIANT ||
        _i == UA_TYPES_VARIABLEATTRIBUTES ||
        _i == UA_TYPES_VARIABLETYPEATTRIBUTES ||
+       _i == UA_TYPES_FILTEROPERAND ||
+       _i == UA_TYPES_MONITORINGFILTER ||
        _i == UA_TYPES_DISCOVERYCONFIGURATION)
         return;
     void *obj = UA_new(&UA_TYPES[_i]);

+ 7 - 0
tools/schema/datatypes_minimal.txt

@@ -171,3 +171,10 @@ FindServersOnNetworkResponse
 DataChangeTrigger
 DeadbandType
 DataChangeFilter
+MonitoringFilter
+EventFilter
+FilterOperand
+SimpleAttributeOperand
+EventNotificationList
+EventFieldList
+StatusChangeNotification