소스 검색

Add async subscription handling for the client

StalderT 6 년 전
부모
커밋
b5dc40efef

+ 2 - 2
examples/client.c

@@ -105,7 +105,7 @@ int main(int argc, char *argv[]) {
     if(monId)
         printf("Monitoring 'the.answer', id %u\n", monId);
     /* The first publish request should return the initial value of the variable */
-    UA_Client_Subscriptions_manuallySendPublishRequest(client);
+    UA_Client_runAsync(client, 1000);
 #endif
 
     /* Read attribute */
@@ -148,7 +148,7 @@ int main(int argc, char *argv[]) {
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     /* Take another look at the.answer */
-    UA_Client_Subscriptions_manuallySendPublishRequest(client);
+    UA_Client_runAsync(client, 100);
     /* Delete the subscription */
     if(!UA_Client_Subscriptions_remove(client, subId))
         printf("Subscription removed\n");

+ 4 - 4
examples/client_subscription_loop.c

@@ -100,10 +100,11 @@ int main(void) {
     UA_ClientConfig config = UA_ClientConfig_default;
     /* Set stateCallback */
     config.stateCallback = stateCallback;
+
     UA_Client *client = UA_Client_new(config);
 
-    /* Endless loop SendPublishRequest */
-    while(running) {
+    /* Endless loop runAsync */
+    while (running) {
         /* if already connected, this will return GOOD and do nothing */
         /* if the connection is closed/errored, the connection will be reset and then reconnected */
         /* Alternatively you can also use UA_Client_getState to get the current state */
@@ -116,8 +117,7 @@ int main(void) {
             continue;
         }
 
-        UA_Client_Subscriptions_manuallySendPublishRequest(client);
-        UA_sleep_ms(1000);
+        UA_Client_runAsync(client, 1000);
     };
 
     /* Clean up */

+ 1 - 1
examples/tutorial_client_events.c

@@ -128,7 +128,7 @@ int main(int argc, char *argv[]) {
     }
 
     while (running)
-        UA_Client_Subscriptions_manuallySendPublishRequest(client);
+        UA_Client_runAsync(client, 100);
 
     /* Delete the subscription */
  cleanup:

+ 69 - 4
include/ua_client.h

@@ -81,7 +81,16 @@ typedef struct UA_ClientConfig {
 
     /* Callback function */
     UA_ClientStateCallback stateCallback;
+
     void *clientContext;
+
+    /* PublishResponse Timeout for background async process in ms */
+    /* 0 = time out disabled                                      */
+    UA_UInt32 backgroundPublishResponseTimeout;
+
+    /* number of PublishResponse standing in the sever */
+    /* 0 = background task disabled                    */
+    UA_UInt16 outStandingPublishRequests;
 } UA_ClientConfig;
 
 
@@ -441,11 +450,30 @@ UA_Client_Service_publish(UA_Client *client, const UA_PublishRequest request) {
  * be made without waiting for a response first. Responess may come in a
  * different ordering. */
 
+/* Listen on the network and process arriving asynchronous responses in the
+ * background. Internal housekeeping and subscription management is done as
+ * well. */
+UA_StatusCode UA_EXPORT
+UA_Client_runAsync(UA_Client *client, UA_UInt16 timeout);
+
 typedef void
 (*UA_ClientAsyncServiceCallback)(UA_Client *client, void *userdata,
-                                 UA_UInt32 requestId, const void *response);
+                                 UA_UInt32 requestId, void *response,
+                                 const UA_DataType *responseType);
 
-/* Don't use this function. Use the type versions below instead. */
+/* Use the type versions of this method. See below. However, the general
+ * mechanism of async service calls is explained here.
+ *
+ * We say that an async service call has been dispatched once this method
+ * returns UA_STATUSCODE_GOOD. If there is an error after an async service has
+ * been dispatched, the callback is called with an "empty" response where the
+ * statusCode has been set accordingly. This is also done if the client is
+ * shutting down and the list of dispatched async services is emptied.
+ *
+ * The statusCode received when the client is shutting down is
+ * UA_STATUSCODE_BADSHUTDOWN.
+ *
+ * The userdata and requestId arguments can be NULL. */
 UA_StatusCode UA_EXPORT
 __UA_Client_AsyncService(UA_Client *client, const void *request,
                          const UA_DataType *requestType,
@@ -453,8 +481,45 @@ __UA_Client_AsyncService(UA_Client *client, const void *request,
                          const UA_DataType *responseType,
                          void *userdata, UA_UInt32 *requestId);
 
-UA_StatusCode UA_EXPORT
-UA_Client_runAsync(UA_Client *client, UA_UInt16 timeout);
+static UA_INLINE UA_StatusCode
+UA_Client_AsyncService_read(UA_Client *client, const UA_ReadRequest *request,
+                            UA_ClientAsyncServiceCallback callback,
+                            void *userdata, UA_UInt32 *requestId) {
+    return __UA_Client_AsyncService(client, (const void*)request,
+                                    &UA_TYPES[UA_TYPES_READREQUEST], callback,
+                                    &UA_TYPES[UA_TYPES_READRESPONSE],
+                                    userdata, requestId);
+}
+
+static UA_INLINE UA_StatusCode
+UA_Client_AsyncService_write(UA_Client *client, const UA_WriteRequest *request,
+                             UA_ClientAsyncServiceCallback callback,
+                             void *userdata, UA_UInt32 *requestId) {
+    return __UA_Client_AsyncService(client, (const void*)request,
+                                    &UA_TYPES[UA_TYPES_WRITEREQUEST], callback, 
+                                    &UA_TYPES[UA_TYPES_WRITERESPONSE],
+                                    userdata, requestId);
+}
+
+static UA_INLINE UA_StatusCode
+UA_Client_AsyncService_call(UA_Client *client, const UA_CallRequest *request,
+                            UA_ClientAsyncServiceCallback callback,
+                            void *userdata, UA_UInt32 *requestId) {
+    return __UA_Client_AsyncService(client, (const void*)request,
+                                    &UA_TYPES[UA_TYPES_CALLREQUEST], callback,
+                                    &UA_TYPES[UA_TYPES_CALLRESPONSE],
+                                    userdata, requestId);
+}
+
+static UA_INLINE UA_StatusCode
+UA_Client_AsyncService_browse(UA_Client *client, const UA_BrowseRequest *request,
+                              UA_ClientAsyncServiceCallback callback,
+                              void *userdata, UA_UInt32 *requestId) {
+    return __UA_Client_AsyncService(client, (const void*)request,
+                                    &UA_TYPES[UA_TYPES_BROWSEREQUEST], callback,
+                                    &UA_TYPES[UA_TYPES_BROWSERESPONSE],
+                                    userdata, requestId);
+}
 
 /**
  * .. toctree::

+ 4 - 1
include/ua_client_highlevel.h

@@ -592,7 +592,10 @@ UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
 UA_StatusCode UA_EXPORT
 UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId);
 
-UA_StatusCode UA_EXPORT
+/* Send a publish request and wait until a response to the request is processed.
+ * Note that other publish responses may be processed in the background until
+ * then. */
+UA_DEPRECATED UA_StatusCode UA_EXPORT
 UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client);
 
 /* Addition of monitored DataChanges */

+ 12 - 7
plugins/ua_config_default.c

@@ -455,18 +455,23 @@ const UA_ClientConfig UA_ClientConfig_default = {
     5000, /* .timeout, 5 seconds */
     10 * 60 * 1000, /* .secureChannelLifeTime, 10 minutes */
     UA_Log_Stdout, /* .logger */
-    /* .localConnectionConfig */
-    {0, /* .protocolVersion */
-     65535, /* .sendBufferSize, 64k per chunk */
-     65535, /* .recvBufferSize, 64k per chunk */
-     0, /* .maxMessageSize, 0 -> unlimited */
-     0}, /* .maxChunkCount, 0 -> unlimited */
+    { /* .localConnectionConfig */
+        0, /* .protocolVersion */
+        65535, /* .sendBufferSize, 64k per chunk */
+        65535, /* .recvBufferSize, 64k per chunk */
+        0, /* .maxMessageSize, 0 -> unlimited */
+        0 /* .maxChunkCount, 0 -> unlimited */
+    },
     UA_ClientConnectionTCP, /* .connectionFunc */
 
     0, /* .customDataTypesSize */
     NULL, /*.customDataTypes */
+
     NULL, /*.stateCallback */
-    NULL  /*.clientContext */
+    NULL,  /*.clientContext */
+
+    0, /* .backgroundPublishResponseTimeout */
+    0 /* .outStandingPublishRequests */
 };
 
 /****************************************/

+ 30 - 6
src/client/ua_client.c

@@ -65,11 +65,7 @@ UA_Client_deleteMembers(UA_Client* client) {
         UA_String_deleteMembers(&client->password);
 
     /* Delete the async service calls */
-    AsyncServiceCall *ac, *ac_tmp;
-    LIST_FOREACH_SAFE(ac, &client->asyncServiceCalls, pointers, ac_tmp) {
-        LIST_REMOVE(ac, pointers);
-        UA_free(ac);
-    }
+    UA_Client_AsyncService_removeAll(client, UA_STATUSCODE_BADSHUTDOWN);
 
     /* Delete the subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
@@ -166,7 +162,7 @@ processAsyncResponse(UA_Client *client, UA_UInt32 requestId, UA_NodeId *response
 
     /* Call the callback */
     if(retval == UA_STATUSCODE_GOOD) {
-        ac->callback(client, ac->userdata, requestId, response);
+        ac->callback(client, ac->userdata, requestId, response, ac->responseType);
         UA_deleteMembers(response, ac->responseType);
     } else {
         UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
@@ -346,6 +342,29 @@ __UA_Client_Service(UA_Client *client, const void *request,
         respHeader->serviceResult = retval;
 }
 
+void
+UA_Client_AsyncService_cancel(UA_Client *client, AsyncServiceCall *ac,
+                              UA_StatusCode statusCode) {
+    /* Create an empty response with the statuscode */
+    void *resp = UA_alloca(ac->responseType->memSize);
+    UA_init(resp, ac->responseType);
+    ((UA_ResponseHeader*)resp)->serviceResult = statusCode;
+
+    ac->callback(client, ac->userdata, ac->requestId, resp, ac->responseType);
+
+    /* Clean up the response. Users might move data into it. For whatever reasons. */
+    UA_deleteMembers(resp, ac->responseType);
+}
+
+void UA_Client_AsyncService_removeAll(UA_Client *client, UA_StatusCode statusCode) {
+    AsyncServiceCall *ac, *ac_tmp;
+    LIST_FOREACH_SAFE(ac, &client->asyncServiceCalls, pointers, ac_tmp) {
+        LIST_REMOVE(ac, pointers);
+        UA_Client_AsyncService_cancel(client, ac, UA_STATUSCODE_BADSHUTDOWN);
+        UA_free(ac);
+    }
+}
+
 UA_StatusCode
 __UA_Client_AsyncService(UA_Client *client, const void *request,
                          const UA_DataType *requestType,
@@ -377,6 +396,11 @@ __UA_Client_AsyncService(UA_Client *client, const void *request,
 UA_StatusCode
 UA_Client_runAsync(UA_Client *client, UA_UInt16 timeout) {
     /* TODO: Call repeated jobs that are scheduled */
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+    UA_StatusCode retvalPublish = UA_Client_Subscriptions_backgroundPublish(client);
+    if (retvalPublish != UA_STATUSCODE_GOOD)
+        return retvalPublish;
+#endif
     UA_DateTime maxDate = UA_DateTime_nowMonotonic() + (timeout * UA_DATETIME_MSEC);
     UA_StatusCode retval = receiveServiceResponse(client, NULL, NULL, maxDate, NULL);
     if(retval == UA_STATUSCODE_GOODNONCRITICALTIMEOUT)

+ 16 - 0
src/client/ua_client_connect.c

@@ -419,6 +419,18 @@ UA_Client_connectInternal(UA_Client *client, const char *endpointUrl,
         goto cleanup;
     setClientState(client, UA_CLIENTSTATE_SECURECHANNEL);
 
+
+    /* Delete async service. TODO: Move this from connect to the disconnect/cleanup phase */
+    UA_Client_AsyncService_removeAll(client, UA_STATUSCODE_BADSHUTDOWN);
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+    client->currentlyOutStandingPublishRequests = 0;
+#endif
+
+// TODO: actually, reactivate an existing session is working, but currently republish is not implemented
+// This option is disabled until we have a good implementation of the subscription recovery.
+
+#ifdef UA_SESSION_RECOVERY
     /* Try to activate an existing Session for this SecureChannel */
     if((!UA_NodeId_equal(&client->authenticationToken, &UA_NODEID_NULL)) && (createNewSession)) {
         retval = activateSession(client);
@@ -434,6 +446,10 @@ UA_Client_connectInternal(UA_Client *client, const char *endpointUrl,
     } else {
         UA_NodeId_deleteMembers(&client->authenticationToken);
     }
+#else
+    UA_NodeId_deleteMembers(&client->authenticationToken);
+#endif /* UA_SESSION_RECOVERY */
+
 
     /* Get Endpoints */
     if(endpointsHandshake) {

+ 281 - 135
src/client/ua_client_highlevel_subscriptions.c

@@ -43,6 +43,8 @@ UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
     }
 
     LIST_INIT(&newSub->monitoredItems);
+    newSub->sequenceNumber = 0;
+    newSub->lastActivity = UA_DateTime_nowMonotonic();
     newSub->lifeTime = response.revisedLifetimeCount;
     newSub->keepAliveCount = response.revisedMaxKeepAliveCount;
     newSub->publishingInterval = response.revisedPublishingInterval;
@@ -58,8 +60,8 @@ UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_Client_Subscription *findSubscription(const UA_Client *client, UA_UInt32 subscriptionId)
-{
+static UA_Client_Subscription *
+findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
     UA_Client_Subscription *sub = NULL;
     LIST_FOREACH(sub, &client->subscriptions, listEntry) {
         if(sub->subscriptionID == subscriptionId)
@@ -68,22 +70,15 @@ static UA_Client_Subscription *findSubscription(const UA_Client *client, UA_UInt
     return sub;
 }
 
-/* remove the subscription remotely */
 UA_StatusCode
 UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
     UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
 
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
-    UA_Client_MonitoredItem *mon, *tmpmon;
-    LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
-        retval =
-            UA_Client_Subscriptions_removeMonitoredItem(client, sub->subscriptionID,
-                                                        mon->monitoredItemId);
-        if(retval != UA_STATUSCODE_GOOD)
-            return retval;
-    }
+    /* remove the subscription from the list    */
+    /* will be reinserted after if error occurs */
+    LIST_REMOVE(sub, listEntry);
 
     /* remove the subscription remotely */
     UA_DeleteSubscriptionsRequest request;
@@ -91,19 +86,30 @@ UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
     request.subscriptionIdsSize = 1;
     request.subscriptionIds = &sub->subscriptionID;
     UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
-    retval = response.responseHeader.serviceResult;
+
+    UA_StatusCode retval = response.responseHeader.serviceResult;
     if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
         retval = response.results[0];
     UA_DeleteSubscriptionsResponse_deleteMembers(&response);
 
     if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
+        /* error occurs re-insert the subscription in the list */
+        LIST_INSERT_HEAD(&client->subscriptions, sub, listEntry);
         UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
                     "Could not remove subscription %u with error code %s",
                     sub->subscriptionID, UA_StatusCode_name(retval));
         return retval;
     }
 
-    UA_Client_Subscriptions_forceDelete(client, sub);
+    /* remove the monitored items locally */
+    UA_Client_MonitoredItem *mon, *tmpmon;
+    LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
+        UA_NodeId_deleteMembers(&mon->monitoredNodeId);
+        LIST_REMOVE(mon, listEntry);
+        UA_free(mon);
+    }
+    UA_free(sub);
+
     return UA_STATUSCODE_GOOD;
 }
 
@@ -178,6 +184,7 @@ addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
 
         itemResults[i] = result->statusCode;
         if(result->statusCode != UA_STATUSCODE_GOOD) {
+            newMonitoredItemIds[i] = 0;
             UA_free(newMon);
             continue;
         }
@@ -213,7 +220,6 @@ addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
     return retval;
 }
 
-
 UA_StatusCode
 UA_Client_Subscriptions_addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
                                           UA_MonitoredItemCreateRequest *items, size_t itemsSize,
@@ -245,7 +251,6 @@ UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscripti
     return retval | retval_item;
 }
 
-
 UA_StatusCode
 UA_Client_Subscriptions_addMonitoredEvents(UA_Client *client, const UA_UInt32 subscriptionId,
                                            UA_MonitoredItemCreateRequest *items, size_t itemsSize,
@@ -359,123 +364,224 @@ UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscri
     return retval | retval_item;
 }
 
+/* Assume the request is already initialized */
+static UA_StatusCode
+UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
+    /* Count acks */
+    UA_Client_NotificationsAckNumber *ack;
+    LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
+        ++request->subscriptionAcknowledgementsSize;
+
+    /* Create the array. Returns a sentinel pointer if the length is zero. */
+    request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
+        UA_Array_new(request->subscriptionAcknowledgementsSize,
+                     &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
+    if(!request->subscriptionAcknowledgements) {
+        request->subscriptionAcknowledgementsSize = 0;
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+
+    size_t i = 0;
+    UA_Client_NotificationsAckNumber *ack_tmp;
+    LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
+        request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
+        request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
+        ++i;
+        LIST_REMOVE(ack, listEntry);
+        UA_free(ack);
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
+/* The value 0 is never used for the sequence number         */
+static UA_UInt32
+UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
+    UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
+    if(nextSequenceNumber == 0)
+        nextSequenceNumber = 1;
+    return nextSequenceNumber;
+}
+
 static void
-UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
-                                 UA_PublishResponse *response) {
-    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
-        return;
+processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
+                              UA_DataChangeNotification *dataChangeNotification) {
+    for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
+        UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
 
-    UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
-    if(!sub)
-        return;
+        /* Find the MonitoredItem */
+        UA_Client_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
+            if(mon->clientHandle == mitemNot->clientHandle)
+                break;
+        }
 
-    /* Check if the server has acknowledged any of the sent ACKs */
-    for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
-        /* remove also acks that are unknown to the server */
-        if(response->results[i] != UA_STATUSCODE_GOOD &&
-           response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
+        if(!mon) {
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "Could not process a notification with clienthandle %u on subscription %u",
+                         mitemNot->clientHandle, sub->subscriptionID);
             continue;
+        }
 
-        /* Remove the ack from the list */
-        UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
-        UA_Client_NotificationsAckNumber *ack;
-        LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
-            if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
-               ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
-                LIST_REMOVE(ack, listEntry);
-                UA_free(ack);
-                UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
-                break;
-            }
+        if(mon->isEventMonitoredItem) {
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "MonitoredItem is configured for Events. But received a "
+                         "DataChangeNotification.");
+            continue;
         }
+
+        mon->handler.dataChangeHandler(client, mon->monitoredItemId,
+                                       &mitemNot->value, mon->handlerContext);
     }
+}
 
-    /* Process the notification messages */
-    UA_NotificationMessage *msg = &response->notificationMessage;
-    for(size_t k = 0; k < msg->notificationDataSize; ++k) {
-        if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
-            continue;
+static void
+processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
+                         UA_EventNotificationList *eventNotificationList) {
+    for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
+        UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
 
-        /* Handle DataChangeNotification */
-        if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
-            UA_DataChangeNotification *dataChangeNotification =
-                (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
-            for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
-                UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
-
-                /* Find the MonitoredItem */
-                UA_Client_MonitoredItem *mon;
-                LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-                    if(mon->clientHandle == mitemNot->clientHandle)
-                        break;
-                }
-
-                if(!mon) {
-                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                                 "Could not process a notification with clienthandle %u on subscription %u",
-                                 mitemNot->clientHandle, sub->subscriptionID);
-                    continue;
-                }
-
-                if(mon->isEventMonitoredItem) {
-                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                                 "MonitoredItem is configured for Events. But received a "
-                                 "DataChangeNotification.");
-                    continue;
-                }
-
-                mon->handler.dataChangeHandler(client, mon->monitoredItemId,
-                                               &mitemNot->value, mon->handlerContext);
-            }
+        /* Find the MonitoredItem */
+        UA_Client_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
+            if(mon->clientHandle == eventFieldList->clientHandle)
+                break;
+        }
+
+        if(!mon) {
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "Could not process a notification with clienthandle %u on subscription %u",
+                         eventFieldList->clientHandle, sub->subscriptionID);
             continue;
         }
 
-        /* Handle EventNotification */
-        if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
-            UA_EventNotificationList *eventNotificationList =
-                (UA_EventNotificationList *)msg->notificationData[k].content.decoded.data;
-            for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
-                UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
-
-                /* Find the MonitoredItem */
-                UA_Client_MonitoredItem *mon;
-                LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-                    if(mon->clientHandle == eventFieldList->clientHandle)
-                        break;
-                }
-
-                if(!mon) {
-                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                                 "Could not process a notification with clienthandle %u on subscription %u",
-                                 eventFieldList->clientHandle, sub->subscriptionID);
-                    continue;
-                }
-
-                if(!mon->isEventMonitoredItem) {
-                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                                 "MonitoredItem is configured for DataChanges. But received a "
-                                 "EventNotification.");
-                    continue;
-                }
-
-                mon->handler.eventHandler(client, mon->monitoredItemId, eventFieldList->eventFieldsSize,
-                                          eventFieldList->eventFields, mon->handlerContext);
-            }
+        if(!mon->isEventMonitoredItem) {
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "MonitoredItem is configured for DataChanges. But received a "
+                         "EventNotification.");
+            continue;
         }
+
+        mon->handler.eventHandler(client, mon->monitoredItemId, eventFieldList->eventFieldsSize,
+                                  eventFieldList->eventFields, mon->handlerContext);
     }
+}
 
-    /* Add to the list of pending acks */
-    UA_Client_NotificationsAckNumber *tmpAck =
-        (UA_Client_NotificationsAckNumber*)UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
-    if(!tmpAck) {
-        UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                       "Not enough memory to store the acknowledgement for a publish "
-                       "message on subscription %u", sub->subscriptionID);
+
+static void
+processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
+                           UA_ExtensionObject *msg) {
+    if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
+        return;
+
+    /* Handle DataChangeNotification */
+    if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
+        UA_DataChangeNotification *dataChangeNotification =
+            (UA_DataChangeNotification *)msg->content.decoded.data;
+        processDataChangeNotification(client, sub, dataChangeNotification);
+        return;
+    }
+
+    /* Handle EventNotification */
+    if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
+        UA_EventNotificationList *eventNotificationList =
+            (UA_EventNotificationList *)msg->content.decoded.data;
+        processEventNotification(client, sub, eventNotificationList);
         return;
     }
-    tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
-    tmpAck->subAck.subscriptionId = sub->subscriptionID;
-    LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
+}
+
+static void
+processPublishResponse(UA_Client *client, UA_PublishRequest *request,
+                       UA_PublishResponse *response) {
+    UA_NotificationMessage *msg = &response->notificationMessage;
+
+    client->currentlyOutStandingPublishRequests--;
+
+    if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS){
+        if(client->config.outStandingPublishRequests > 1){
+            client->config.outStandingPublishRequests--;
+            UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                          "Too many publishrequest, we reduce outStandingPublishRequests to %d",
+                           client->config.outStandingPublishRequests);
+        } else {
+            UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "Too many publishrequest when outStandingPublishRequests = 1");
+            UA_Client_close(client);
+        }
+        goto cleanup;
+    }
+
+    if(response->responseHeader.serviceResult == UA_STATUSCODE_BADNOSUBSCRIPTION){
+       if(LIST_FIRST(&client->subscriptions)){
+            UA_Client_close(client);
+            UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "PublishRequest error : No subscription");
+        }
+        goto cleanup;
+    }
+
+    if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID){
+        UA_Client_close(client);
+        UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "Received BadSessionIdInvalid");
+        goto cleanup;
+    }
+
+    UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
+    if(!sub)
+        goto cleanup;
+    sub->lastActivity = UA_DateTime_nowMonotonic();
+
+
+    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
+        goto cleanup;
+
+    /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
+    if((sub->sequenceNumber != msg->sequenceNumber) &&
+        (UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber)) {
+        UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                     "Invalid subscritpion sequenceNumber");
+        UA_Client_close(client);
+        goto cleanup;
+    }
+    sub->sequenceNumber = msg->sequenceNumber;
+
+    /* Process the notification messages */
+    for(size_t k = 0; k < msg->notificationDataSize; ++k)
+        processNotificationMessage(client, sub, &msg->notificationData[k]);
+
+    /* Add to the list of pending acks */
+    for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
+        if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
+            continue;
+        UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
+            UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
+        if(!tmpAck) {
+            UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                           "Not enough memory to store the acknowledgement for a publish "
+                           "message on subscription %u", sub->subscriptionID);
+            break;
+        }   
+        tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
+        tmpAck->subAck.subscriptionId = sub->subscriptionID;
+        LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
+        break;
+    } 
+
+ cleanup:
+    UA_PublishRequest_deleteMembers(request);
+}
+
+static void
+processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
+                            void *response, const UA_DataType *responseType) {
+    UA_PublishRequest *req = (UA_PublishRequest*)userdata;
+    UA_PublishResponse *res = (UA_PublishResponse*)response;
+    processPublishResponse(client, req, res);
+    UA_PublishRequest_delete(req);
+    /* Fill up the outstanding publish requests */
+    UA_Client_Subscriptions_backgroundPublish(client);
 }
 
 UA_StatusCode
@@ -492,28 +598,20 @@ UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
     while(moreNotifications) {
         UA_PublishRequest request;
         UA_PublishRequest_init(&request);
-        request.subscriptionAcknowledgementsSize = 0;
-
-        UA_Client_NotificationsAckNumber *ack;
-        LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
-            ++request.subscriptionAcknowledgementsSize;
-        if(request.subscriptionAcknowledgementsSize > 0) {
-            request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
-                UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
-            if(!request.subscriptionAcknowledgements)
-                return UA_STATUSCODE_BADOUTOFMEMORY;
-        }
+        retval = UA_Client_preparePublishRequest(client, &request);
+        if(retval != UA_STATUSCODE_GOOD)
+            return retval;
 
-        int i = 0;
-        LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
-            request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
-            request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
-            ++i;
-        }
+        /* Manually increase the number of sent publish requests. Otherwise we
+         * send out one too many when we process async responses when we wait
+         * for the correct publish response. The
+         * currentlyOutStandingPublishRequests will be reduced during processing
+         * of the response. */
+        client->currentlyOutStandingPublishRequests++;
 
         UA_PublishResponse response = UA_Client_Service_publish(client, request);
-        UA_Client_processPublishResponse(client, &request, &response);
-
+        processPublishResponse(client, &request, &response);
+        
         now = UA_DateTime_nowMonotonic();
         if(now > maxDate) {
             moreNotifications = UA_FALSE;
@@ -545,4 +643,52 @@ UA_Client_Subscriptions_clean(UA_Client *client) {
         UA_Client_Subscriptions_forceDelete(client, sub); /* force local removal */
 }
 
+UA_StatusCode
+UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
+    if(client->state < UA_CLIENTSTATE_SESSION)
+        return UA_STATUSCODE_BADSERVERNOTCONNECTED;
+
+    /* The session must have at least one subscription */
+    if(!LIST_FIRST(&client->subscriptions))
+        return UA_STATUSCODE_GOOD;
+
+    while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
+        UA_PublishRequest *request = UA_PublishRequest_new();
+        if (!request)
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+
+        UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
+        if(retval != UA_STATUSCODE_GOOD) {
+            UA_PublishRequest_delete(request);
+            return retval;
+        }
+    
+        UA_UInt32 requestId;
+        client->currentlyOutStandingPublishRequests++;
+        retval = __UA_Client_AsyncService(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
+                                          processPublishResponseAsync,
+                                          &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
+                                          (void*)request, &requestId);
+        if(retval != UA_STATUSCODE_GOOD) {
+            UA_PublishRequest_delete(request);
+            return retval;
+        }
+    }
+
+    /* Check subscriptions inactivity */
+    UA_Client_Subscription *sub;
+    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
+        if(((UA_DateTime)(sub->publishingInterval * sub->keepAliveCount + client->config.timeout) * 
+             UA_DATETIME_MSEC + sub->lastActivity) < UA_DateTime_nowMonotonic()) {
+            UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                         "Inactivity for Subscription %d. Closing the connection.",
+                         sub->subscriptionID);
+            UA_Client_close(client);
+            return UA_STATUSCODE_BADCONNECTIONCLOSED;
+        }
+    }
+
+    return UA_STATUSCODE_GOOD;
+}
+
 #endif /* UA_ENABLE_SUBSCRIPTIONS */

+ 11 - 0
src/client/ua_client_internal.h

@@ -55,6 +55,8 @@ typedef struct UA_Client_Subscription {
     UA_UInt32 subscriptionID;
     UA_UInt32 notificationsPerPublish;
     UA_UInt32 priority;
+    UA_UInt32 sequenceNumber;
+    UA_DateTime lastActivity;
     LIST_HEAD(UA_ListOfClientMonitoredItems, UA_Client_MonitoredItem) monitoredItems;
 } UA_Client_Subscription;
 
@@ -62,6 +64,9 @@ void UA_Client_Subscriptions_forceDelete(UA_Client *client, UA_Client_Subscripti
 
 void UA_Client_Subscriptions_clean(UA_Client *client);
 
+UA_StatusCode
+UA_Client_Subscriptions_backgroundPublish(UA_Client *client);
+
 #endif
 
 /**********/
@@ -76,6 +81,11 @@ typedef struct AsyncServiceCall {
     void *userdata;
 } AsyncServiceCall;
 
+void UA_Client_AsyncService_cancel(UA_Client *client, AsyncServiceCall *ac,
+                                   UA_StatusCode statusCode);
+
+void UA_Client_AsyncService_removeAll(UA_Client *client, UA_StatusCode statusCode);
+
 typedef enum {
     UA_CLIENTAUTHENTICATION_NONE,
     UA_CLIENTAUTHENTICATION_USERNAME
@@ -115,6 +125,7 @@ struct UA_Client {
     UA_UInt32 monitoredItemHandles;
     LIST_HEAD(ListOfUnacknowledgedNotifications, UA_Client_NotificationsAckNumber) pendingNotificationsAcks;
     LIST_HEAD(ListOfClientSubscriptionItems, UA_Client_Subscription) subscriptions;
+    UA_UInt16 currentlyOutStandingPublishRequests;
 #endif
 };
 

+ 3 - 1
tests/CMakeLists.txt

@@ -49,7 +49,9 @@ add_library(open62541-testplugins OBJECT ${test_plugin_sources})
 add_dependencies(open62541-testplugins open62541)
 target_compile_definitions(open62541-testplugins PRIVATE -DUA_DYNAMIC_LINKING_EXPORT)
 
-
+if(NOT MSVC)
+    add_definitions(-Wno-deprecated-declarations)
+endif()
 # Workaround some clang warnings in the uni tests
 if((NOT ${CMAKE_SYSTEM_NAME} MATCHES "OpenBSD") AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" STREQUAL "xClang"))
     add_definitions(-Wno-sign-conversion)

+ 7 - 1
tests/client/check_client.c

@@ -214,6 +214,9 @@ START_TEST(Client_reconnect) {
 }
 END_TEST
 
+// TODO ACTIVATE THE TESTS WHEN SESSION RECOVERY IS GOOD
+#ifdef UA_SESSION_RECOVERY
+
 START_TEST(Client_activateSessionClose) {
     // restart server
     teardown();
@@ -303,6 +306,8 @@ START_TEST(Client_activateSessionTimeout) {
 }
 END_TEST
 
+#endif /* UA_SESSION_RECOVERY */
+
 static Suite* testSuite_Client(void) {
     Suite *s = suite_create("Client");
     TCase *tc_client = tcase_create("Client Basic");
@@ -312,14 +317,15 @@ static Suite* testSuite_Client(void) {
     tcase_add_test(tc_client, Client_endpoints);
     tcase_add_test(tc_client, Client_endpoints_empty);
     tcase_add_test(tc_client, Client_read);
-    tcase_add_test(tc_client, Client_delete_without_connect);
     suite_add_tcase(s,tc_client);
     TCase *tc_client_reconnect = tcase_create("Client Reconnect");
     tcase_add_checked_fixture(tc_client_reconnect, setup, teardown);
     tcase_add_test(tc_client_reconnect, Client_renewSecureChannel);
     tcase_add_test(tc_client_reconnect, Client_reconnect);
+#ifdef UA_SESSION_RECOVERY
     tcase_add_test(tc_client_reconnect, Client_activateSessionClose);
     tcase_add_test(tc_client_reconnect, Client_activateSessionTimeout);
+#endif /* UA_SESSION_RECOVERY */
     suite_add_tcase(s,tc_client_reconnect);
     return s;
 }

+ 157 - 7
tests/client/check_client_subscriptions.c

@@ -2,6 +2,10 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+#ifdef _MSC_VER
+#pragma warning(disable:4996) // warning C4996: 'UA_Client_Subscriptions_manuallySendPublishRequest': was declared deprecated
+#endif
+
 #include <stdio.h>
 #include <stdlib.h>
 #include "ua_types.h"
@@ -17,7 +21,6 @@
 #include "testing_networklayers.h"
 #include "thread_wrapper.h"
 
-
 UA_Server *server;
 UA_ServerConfig *config;
 UA_Boolean *running;
@@ -34,6 +37,7 @@ static void setup(void) {
     running = UA_Boolean_new();
     *running = true;
     config = UA_ServerConfig_new_default();
+    config->maxPublishReqPerSession = 8;
     server = UA_Server_new(config);
     UA_Server_run_startup(server);
     THREAD_CREATE(server_thread, serverloop);
@@ -50,12 +54,20 @@ static void teardown(void) {
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 
-UA_Boolean notificationReceived;
+UA_Boolean notificationReceived = false;
 UA_UInt32 countNotificationReceived = 0;
 
-static void monitoredItemHandler(UA_Client *client, UA_UInt32 monId, UA_DataValue *value, void *context) {
+static void
+monitoredItemHandler(UA_Client *client, UA_UInt32 monId, UA_DataValue *value, void *context) {
+    notificationReceived = true;
+    countNotificationReceived++;
+}
+
+static void
+monitoredItemHandlerSubSleep(UA_Client *client, UA_UInt32 monId, UA_DataValue *value, void *context) {
     notificationReceived = true;
     countNotificationReceived++;
+    UA_fakeSleep((UA_UInt32)(UA_SubscriptionSettings_default.requestedPublishingInterval + 2));
 }
 
 START_TEST(Client_subscription) {
@@ -165,11 +177,12 @@ START_TEST(Client_subscription_addMonitoredItems) {
     ck_assert_uint_eq(notificationReceived, true);
     ck_assert_uint_eq(countNotificationReceived, 3);
 
-    retval = UA_Client_Subscriptions_removeMonitoredItem(client, subId, newMonitoredItemIds[0]);
-    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
-
-    retval = UA_Client_Subscriptions_removeMonitoredItem(client, subId, newMonitoredItemIds[1]);
+    retval = UA_Client_Subscriptions_removeMonitoredItems(client, subId, newMonitoredItemIds,
+                                                          3, itemResults);
     ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(itemResults[0], UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(itemResults[1], UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(itemResults[2], UA_STATUSCODE_BADMONITOREDITEMIDINVALID);
 
     retval = UA_Client_Subscriptions_remove(client, subId);
     ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
@@ -281,6 +294,141 @@ START_TEST(Client_subscription_connectionClose) {
 }
 END_TEST
 
+START_TEST(Client_subscription_without_notification) {
+    UA_Client *client = UA_Client_new(UA_ClientConfig_default);
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_UInt32 subId;
+    retval = UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_default, &subId);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* monitor the server state */
+    UA_UInt32 monId;
+    retval = UA_Client_Subscriptions_addMonitoredItem(client, subId, UA_NODEID_NUMERIC(0, 2259),
+                                                      UA_ATTRIBUTEID_VALUE, monitoredItemHandler,
+                                                      NULL, &monId, 9999999);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_fakeSleep((UA_UInt32)UA_SubscriptionSettings_default.requestedPublishingInterval + 1);
+
+    notificationReceived = false;
+    retval = UA_Client_Subscriptions_manuallySendPublishRequest(client);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(notificationReceived, true);
+
+    UA_fakeSleep((UA_UInt32)UA_SubscriptionSettings_default.requestedPublishingInterval + 1);
+
+    notificationReceived = false;
+    retval = UA_Client_Subscriptions_manuallySendPublishRequest(client);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(notificationReceived, false);
+
+    retval = UA_Client_Subscriptions_removeMonitoredItem(client, subId, monId);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    retval = UA_Client_Subscriptions_remove(client, subId);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+}
+END_TEST
+
+static UA_ClientState callbackClientState;
+
+static void
+stateCallback (UA_Client *client, UA_ClientState clientState){
+    callbackClientState = clientState;
+
+    if (clientState == UA_CLIENTSTATE_SESSION){
+        /* A new session was created. We need to create the subscription. */
+        /* Create a subscription */
+        UA_UInt32 subId = 0;
+        UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_default, &subId);
+        ck_assert_uint_ne(subId, 0);
+
+        /* Add a MonitoredItem */
+        UA_UInt32 monId = 0;
+        UA_NodeId monitorThis = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
+        UA_Client_Subscriptions_addMonitoredItem(client, subId, monitorThis, UA_ATTRIBUTEID_VALUE,
+                                                 &monitoredItemHandlerSubSleep, NULL, &monId, 250);
+        ck_assert_uint_ne(monId, 0);
+    }
+}
+
+START_TEST(Client_subscription_async_sub) {
+    UA_ClientConfig clientConfig = UA_ClientConfig_default;
+    /* Set stateCallback */
+    clientConfig.stateCallback = stateCallback;
+
+    /* Activate background publish request */
+    clientConfig.outStandingPublishRequests = 10;
+
+    UA_Client *client = UA_Client_new(clientConfig);
+    ck_assert_uint_eq(callbackClientState, UA_CLIENTSTATE_DISCONNECTED);
+
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(callbackClientState, UA_CLIENTSTATE_SESSION);
+
+    UA_fakeSleep((UA_UInt32)UA_SubscriptionSettings_default.requestedPublishingInterval + 1);
+
+    countNotificationReceived = 0;
+
+    notificationReceived = false;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, true);
+    ck_assert_uint_eq(countNotificationReceived, 1);
+
+    notificationReceived = false;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, true);
+    ck_assert_uint_eq(countNotificationReceived, 2);
+
+    notificationReceived = false;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, true);
+    ck_assert_uint_eq(countNotificationReceived, 3);
+
+    notificationReceived = false;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, true);
+    ck_assert_uint_eq(countNotificationReceived, 4);
+
+    notificationReceived = false;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, true);
+    ck_assert_uint_eq(countNotificationReceived, 5);
+
+    ck_assert_uint_lt(client->config.outStandingPublishRequests, 10);
+
+    UA_Client_recv = client->connection.recv;
+    client->connection.recv = UA_Client_recvTesting;
+
+    notificationReceived = false;
+    /* Simulate network cable unplugged (no response from server) */
+    UA_Client_recvTesting_result = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
+    UA_Client_runAsync(client, (UA_UInt16)(UA_SubscriptionSettings_default.requestedPublishingInterval + 1));
+    ck_assert_uint_eq(notificationReceived, false);
+    ck_assert_uint_eq(callbackClientState, UA_CLIENTSTATE_SESSION);
+
+    /* Simulate network cable unplugged (no response from server) */
+    UA_Client_recvTesting_result = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
+    UA_Client_runAsync(client, (UA_UInt16)clientConfig.timeout);
+    ck_assert_uint_eq(notificationReceived, false);
+    ck_assert_uint_eq(callbackClientState, UA_CLIENTSTATE_SESSION);
+
+    /* Simulate network cable unplugged (no response from server) */
+    UA_Client_recvTesting_result = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
+    UA_Client_runAsync(client, (UA_UInt16)clientConfig.timeout);
+    ck_assert_uint_eq(notificationReceived, false);
+    ck_assert_uint_eq(callbackClientState, UA_CLIENTSTATE_DISCONNECTED);
+
+    UA_Client_delete(client);
+}
+END_TEST
+
 START_TEST(Client_methodcall) {
     UA_Client *client = UA_Client_new(UA_ClientConfig_default);
     UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
@@ -341,6 +489,8 @@ static Suite* testSuite_Client(void) {
     tcase_add_test(tc_client, Client_subscription_connectionClose);
     tcase_add_test(tc_client, Client_subscription_addMonitoredItems);
     tcase_add_test(tc_client, Client_subscription_keepAlive);
+    tcase_add_test(tc_client, Client_subscription_without_notification);
+    tcase_add_test(tc_client, Client_subscription_async_sub);
 #endif /* UA_ENABLE_SUBSCRIPTIONS */
 
     TCase *tc_client2 = tcase_create("Client Subscription + Method Call of GetMonitoredItmes");

+ 3 - 2
tests/fuzz/CMakeLists.txt

@@ -6,7 +6,9 @@ include_directories(${PROJECT_SOURCE_DIR}/plugins)
 include_directories(${PROJECT_SOURCE_DIR}/tests/testing-plugins)
 include_directories(${PROJECT_BINARY_DIR}/src_generated)
 
-
+if(NOT MSVC)
+    add_definitions(-Wno-deprecated-declarations)
+endif()
 
 if (UA_BUILD_FUZZING_CORPUS)
     set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
@@ -49,7 +51,6 @@ else()
 endif()
 list(APPEND LIBS "${open62541_LIBRARIES}")
 
-
 # Use different plugins for testing
 set(fuzzing_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/ua_network_tcp.c
         ${PROJECT_SOURCE_DIR}/tests/testing-plugins/testing_clock.c

+ 4 - 0
tests/fuzz/corpus_generator.c

@@ -10,6 +10,10 @@
  * corpus to the repository.
  */
 
+#ifdef _MSC_VER
+#pragma warning(disable:4996) // warning C4996: 'UA_Client_Subscriptions_manuallySendPublishRequest': was declared deprecated
+#endif
+
 #ifndef UA_DEBUG_DUMP_PKGS_FILE
 #error UA_DEBUG_DUMP_PKGS_FILE must be defined
 #endif