Przeglądaj źródła

client subscriptions improvement (#1467)

* client subscriptions improvement

* remove lastSendState

* fix typo
StalderT 6 lat temu
rodzic
commit
bc9e669ff2

+ 4 - 0
examples/CMakeLists.txt

@@ -58,6 +58,10 @@ add_example(client client.c)
 
 add_example(client_connect_loop client_connect_loop.c)
 
+if(UA_ENABLE_SUBSCRIPTIONS)
+add_example(client_subscription_loop client_subscription_loop.c)
+endif()
+
 ####################
 # Feature Examples #
 ####################

+ 1 - 1
examples/client_connect_loop.c

@@ -82,7 +82,7 @@ int main(void) {
             UA_Variant_hasScalarType(&value, &UA_TYPES[UA_TYPES_DATETIME])) {
             UA_DateTime raw_date = *(UA_DateTime *) value.data;
             UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
-            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "date is: %u-%u-%u %u:%u:%u.%03u",
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "date is: %02u-%02u-%04u %02u:%02u:%02u.%03u",
                         dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
         }
         UA_Variant_deleteMembers(&value);

+ 126 - 0
examples/client_subscription_loop.c

@@ -0,0 +1,126 @@
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
+
+/* Enable POSIX features */
+#ifndef _XOPEN_SOURCE
+# define _XOPEN_SOURCE 600
+#endif
+#ifndef _DEFAULT_SOURCE
+# define _DEFAULT_SOURCE
+#endif
+/* On older systems we need to define _BSD_SOURCE.
+ * _DEFAULT_SOURCE is an alias for that. */
+#ifndef _BSD_SOURCE
+# define _BSD_SOURCE
+#endif
+
+/**
+ * Client disconnect handling
+ * --------------------------
+ * This example shows you how to handle a client disconnect, e.g., if the server
+ * is shut down while the client is connected. You just need to call connect
+ * again and the client will automatically reconnect.
+ *
+ * This example is very similar to the tutorial_client_firststeps.c. */
+
+#include "open62541.h"
+#include <signal.h>
+
+#ifdef _WIN32
+# include <windows.h>
+# define UA_sleep_ms(X) Sleep(X)
+#else
+# include <unistd.h>
+# define UA_sleep_ms(X) usleep(X * 1000)
+#endif
+
+UA_Boolean running = true;
+UA_Logger logger = UA_Log_Stdout;
+
+static void stopHandler(int sign) {
+    UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
+    running = 0;
+}
+
+static void
+handler_currentTimeChanged(UA_UInt32 monId, UA_DataValue *value, void *context) {
+    UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "currentTime has changed!");
+    if (UA_Variant_hasScalarType(&value->value, &UA_TYPES[UA_TYPES_DATETIME])) {
+        UA_DateTime raw_date = *(UA_DateTime *) value->value.data;
+        UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
+        UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "date is: %02u-%02u-%04u %02u:%02u:%02u.%03u",
+                        dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
+    }
+}
+
+static void
+stateCallback (UA_Client *client, UA_ClientState clientState){
+
+    switch (clientState){
+        case UA_CLIENTSTATE_DISCONNECTED:
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
+        break; 
+        case UA_CLIENTSTATE_CONNECTED:
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A TCP connection to the server is open");
+        break; 
+        case UA_CLIENTSTATE_SECURECHANNEL:
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A SecureChannel to the server is open");
+        break; 
+        case UA_CLIENTSTATE_SESSION:{
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
+            /* A new session was created. We need to create the subscription. */
+            /* Create a subscription */
+            UA_UInt32 subId = 0;
+            UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_default, &subId);
+            if(subId)
+                UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Create subscription succeeded, id %u", subId);
+            else
+                return;
+
+            /* Add a MonitoredItem */
+            UA_UInt32 monId = 0;
+            UA_NodeId monitorThis = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
+            UA_Client_Subscriptions_addMonitoredItem(client, subId, monitorThis, UA_ATTRIBUTEID_VALUE,
+                                                     &handler_currentTimeChanged, NULL, &monId, 250);
+            if (monId)
+                UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Monitoring UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME', id %u", subId);
+        }
+        break; 
+        case UA_CLIENTSTATE_SESSION_RENEWED:
+            UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open (renewed)");
+            /* The session was renewed. We don't need to recreate the subscription. */
+        break; 
+    }
+    return;
+}
+
+int main(void) {
+    signal(SIGINT, stopHandler); /* catches ctrl-c */
+
+    UA_ClientConfig config = UA_ClientConfig_default;
+    /* Set stateCallback */
+    config.stateCallback = stateCallback;
+    UA_Client *client = UA_Client_new(config);
+
+    /* Endless loop SendPublishRequest */
+    while (running) {
+        /* if already connected, this will return GOOD and do nothing */
+        /* if the connection is closed/errored, the connection will be reset and then reconnected */
+        /* Alternatively you can also use UA_Client_getState to get the current state */
+        UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+        if (retval != UA_STATUSCODE_GOOD) {
+            UA_LOG_ERROR(logger, UA_LOGCATEGORY_USERLAND, "Not connected. Retrying to connect in 1 second");
+            /* The connect may timeout after 1 second (see above) or it may fail immediately on network errors */
+            /* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
+            UA_sleep_ms(1000);
+            continue;
+        }
+
+        UA_Client_Subscriptions_manuallySendPublishRequest(client);
+        UA_sleep_ms(1000);
+    };
+
+    /* Clean up */
+    UA_Client_delete(client); /* Disconnects the client internally */
+    return UA_STATUSCODE_GOOD;
+}

+ 24 - 17
include/ua_client.h

@@ -32,6 +32,27 @@ extern "C" {
  * `UA_Client_Subscriptions_manuallySendPublishRequest`. See also :ref:`here
  * <client-subscriptions>`.
  *
+ * Client Lifecycle
+ * ---------------- */
+
+typedef enum {
+    UA_CLIENTSTATE_DISCONNECTED,        /* The client is disconnected */
+    UA_CLIENTSTATE_CONNECTED,           /* A TCP connection to the server is open */
+    UA_CLIENTSTATE_SECURECHANNEL,       /* A SecureChannel to the server is open */
+    UA_CLIENTSTATE_SESSION,             /* A session with the server is open */
+    UA_CLIENTSTATE_SESSION_RENEWED      /* A session with the server is open (renewed) */
+} UA_ClientState;
+
+struct UA_Client;
+typedef struct UA_Client UA_Client;
+
+/**
+ * Client Lifecycle callback
+ * ------------------------- */
+
+typedef void (*UA_ClientStateCallback)(UA_Client *client, UA_ClientState clientState);
+
+/**
  * Client Configuration
  * -------------------- */
 
@@ -46,25 +67,11 @@ typedef struct UA_ClientConfig {
     /* Custom DataTypes */
     size_t customDataTypesSize;
     const UA_DataType *customDataTypes;
-} UA_ClientConfig;
 
-/**
- * Client Lifecycle
- * ---------------- */
-
-typedef enum {
-    UA_CLIENTSTATE_DISCONNECTED,        /* The client is not connected */
-    UA_CLIENTSTATE_CONNECTED,           /* A TCP connection to the server is open */
-    UA_CLIENTSTATE_SECURECHANNEL,       /* A SecureChannel to the server is open */
-    UA_CLIENTSTATE_SESSION,             /* A session with the server is open */
-    UA_CLIENTSTATE_SESSION_DISCONNECTED /* A session with the server is open.
-                                         * But the SecureChannel was lost. Try
-                                         * to establish a new SecureChannel and
-                                         * reattach the existing session. */
-} UA_ClientState;
+    /* Callback function */
+    UA_ClientStateCallback stateCallback;
+} UA_ClientConfig;
 
-struct UA_Client;
-typedef struct UA_Client UA_Client;
 
 /* Create a new client */
 UA_Client UA_EXPORT *

+ 2 - 1
plugins/ua_config_default.c

@@ -301,7 +301,8 @@ const UA_ClientConfig UA_ClientConfig_default = {
     UA_ClientConnectionTCP, /* .connectionFunc */
 
     0, /* .customDataTypesSize */
-    NULL /*.customDataTypes */
+    NULL, /*.customDataTypes */
+    NULL /*.stateCallback */
 };
 
 /****************************************/

+ 3 - 8
src/client/ua_client.c

@@ -22,6 +22,8 @@ UA_Client_init(UA_Client* client, UA_ClientConfig config) {
     client->channel.securityPolicy = &client->securityPolicy;
     client->channel.securityMode = UA_MESSAGESECURITYMODE_NONE;
     client->config = config;
+    if (client->config.stateCallback)
+        client->config.stateCallback(client, client->state);
 }
 
 UA_Client *
@@ -57,14 +59,7 @@ UA_Client_deleteMembers(UA_Client* client) {
 
     /* Delete the subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-    UA_Client_NotificationsAckNumber *n, *tmp;
-    LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
-        LIST_REMOVE(n, listEntry);
-        UA_free(n);
-    }
-    UA_Client_Subscription *sub, *tmps;
-    LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
-        UA_Client_Subscriptions_forceDelete(client, sub); /* force local removal */
+    UA_Client_Subscriptions_clean(client);
 #endif
 }
 

+ 30 - 11
src/client/ua_client_connect.c

@@ -12,6 +12,19 @@
 
 #define UA_MINMESSAGESIZE 8192
 
+
+ /********************/
+ /* Set client state */
+ /********************/
+static void
+setClientState(UA_Client *client, UA_ClientState state)
+{
+    if (client->state != state){
+        client->state = state;
+        if (client->config.stateCallback)
+            client->config.stateCallback(client, client->state);
+    }
+}
  /***********************/
  /* Open the Connection */
  /***********************/
@@ -390,14 +403,14 @@ UA_Client_connectInternal(UA_Client *client, const char *endpointUrl,
     retval = HelAckHandshake(client);
     if(retval != UA_STATUSCODE_GOOD)
         goto cleanup;
-    client->state = UA_CLIENTSTATE_CONNECTED;
+    setClientState(client, UA_CLIENTSTATE_CONNECTED);
 
     /* Open a SecureChannel. TODO: Select with endpoint  */
     client->channel.connection = &client->connection;
     retval = openSecureChannel(client, false);
     if(retval != UA_STATUSCODE_GOOD)
         goto cleanup;
-    client->state = UA_CLIENTSTATE_SECURECHANNEL;
+    setClientState(client, UA_CLIENTSTATE_SECURECHANNEL);
 
     /* Try to activate an existing Session for this SecureChannel */
     if((!UA_NodeId_equal(&client->authenticationToken, &UA_NODEID_NULL)) && (createNewSession)) {
@@ -408,7 +421,7 @@ UA_Client_connectInternal(UA_Client *client, const char *endpointUrl,
         }else{
             if(retval != UA_STATUSCODE_GOOD)
                 goto cleanup;
-            client->state = UA_CLIENTSTATE_SESSION;
+            setClientState(client, UA_CLIENTSTATE_SESSION_RENEWED);
             return retval;
         }
     }else{
@@ -427,12 +440,16 @@ UA_Client_connectInternal(UA_Client *client, const char *endpointUrl,
         retval = createSession(client);
         if(retval != UA_STATUSCODE_GOOD)
             goto cleanup;
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+        /* A new session has been created. We need to clean up the subscriptions */
+        UA_Client_Subscriptions_clean(client);
+#endif
         retval = activateSession(client);
+        if(retval != UA_STATUSCODE_GOOD)
+            goto cleanup;
+        setClientState(client, UA_CLIENTSTATE_SESSION);
     }
 
-    if(retval != UA_STATUSCODE_GOOD)
-        goto cleanup;
-    client->state = UA_CLIENTSTATE_SESSION;
     return retval;
 
 cleanup:
@@ -501,22 +518,24 @@ sendCloseSecureChannel(UA_Client *client) {
 UA_StatusCode
 UA_Client_disconnect(UA_Client *client) {
     /* Is a session established? */
-    if(client->state == UA_CLIENTSTATE_SESSION){
-        client->state = UA_CLIENTSTATE_SESSION_DISCONNECTED;
+    if(client->state >= UA_CLIENTSTATE_SESSION){
+        client->state = UA_CLIENTSTATE_SECURECHANNEL;
         sendCloseSession(client);
     }
     UA_NodeId_deleteMembers(&client->authenticationToken);
     client->requestHandle = 0;
 
     /* Is a secure channel established? */
-    if(client->state >= UA_CLIENTSTATE_SECURECHANNEL)
+    if(client->state >= UA_CLIENTSTATE_SECURECHANNEL){
+        client->state = UA_CLIENTSTATE_CONNECTED;
         sendCloseSecureChannel(client);
+    }
 
     /* Close the TCP connection */
     if(client->connection.state != UA_CONNECTION_CLOSED)
         client->connection.close(&client->connection);
 
-    client->state = UA_CLIENTSTATE_DISCONNECTED;
+    setClientState(client, UA_CLIENTSTATE_DISCONNECTED);
     return UA_STATUSCODE_GOOD;
 }
 
@@ -531,6 +550,6 @@ UA_Client_close(UA_Client *client) {
     if(client->connection.state != UA_CONNECTION_CLOSED)
         client->connection.close(&client->connection);
 
-    client->state = UA_CLIENTSTATE_DISCONNECTED;
+    setClientState(client, UA_CLIENTSTATE_DISCONNECTED);
     return UA_STATUSCODE_GOOD;
 }

+ 13 - 0
src/client/ua_client_highlevel_subscriptions.c

@@ -497,4 +497,17 @@ UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
     return retval;
 }
 
+void
+UA_Client_Subscriptions_clean(UA_Client *client){
+    UA_Client_NotificationsAckNumber *n, *tmp;
+    LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
+        LIST_REMOVE(n, listEntry);
+        UA_free(n);
+    }
+
+    UA_Client_Subscription *sub, *tmps;
+    LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
+        UA_Client_Subscriptions_forceDelete(client, sub); /* force local removal */
+}
+
 #endif /* UA_ENABLE_SUBSCRIPTIONS */

+ 3 - 0
src/client/ua_client_internal.h

@@ -52,6 +52,8 @@ typedef struct UA_Client_Subscription {
 
 void UA_Client_Subscriptions_forceDelete(UA_Client *client, UA_Client_Subscription *sub);
 
+void UA_Client_Subscriptions_clean(UA_Client *client);
+
 #endif
 
 /**********/
@@ -74,6 +76,7 @@ typedef enum {
 struct UA_Client {
     /* State */
     UA_ClientState state;
+
     UA_ClientConfig config;
 
     /* Connection */