Browse Source

async client functionality

Julius Pfrommer 7 years ago
parent
commit
bffefa4cd0

+ 14 - 0
CHANGELOG

@@ -1,6 +1,20 @@
 The changelog tracks changes to the public API.
 The changelog tracks changes to the public API.
 Internal refactorings and bug fixes are not reported here.
 Internal refactorings and bug fixes are not reported here.
 
 
+2017-07-03 jpfr
+
+    * Implement asynchronous service calls from the client
+
+      All OPC UA services are asynchronous in nature. So several service
+      calls can be made without waiting for a response first. Responess
+      may come in a different ordering. The client takes a method pointer
+      and a data pointer to perform an asynchronous callback on the request
+      response.
+
+      Synchronous service calls are still supported in the client. Asynchronous
+      responses are processed in the background until the synchronous response
+      (the client is waiting for) returns the control flow back to the user.
+
 2017-06-26 janitza-thbe
 2017-06-26 janitza-thbe
 
 
     * Enable IPv6 in the networklayer plugin
     * Enable IPv6 in the networklayer plugin

+ 26 - 2
include/ua_client.h

@@ -187,8 +187,8 @@ UA_StatusCode UA_EXPORT UA_Client_manuallyRenewSecureChannel(UA_Client *client);
 /**
 /**
  * .. _client-services:
  * .. _client-services:
  *
  *
- * Raw Services
- * ------------
+ * Services
+ * --------
  * The raw OPC UA services are exposed to the client. But most of them time, it
  * The raw OPC UA services are exposed to the client. But most of them time, it
  * is better to use the convenience functions from ``ua_client_highlevel.h``
  * is better to use the convenience functions from ``ua_client_highlevel.h``
  * that wrap the raw services. */
  * that wrap the raw services. */
@@ -407,6 +407,30 @@ UA_Client_Service_publish(UA_Client *client, const UA_PublishRequest request) {
 
 
 #endif
 #endif
 
 
+/**
+ * .. _client-async-services:
+ *
+ * Asynchronous Services
+ * ---------------------
+ * All OPC UA services are asynchronous in nature. So several service calls can
+ * be made without waiting for a response first. Responess may come in a
+ * different ordering. */
+
+typedef void
+(*UA_ClientAsyncServiceCallback)(UA_Client *client, void *userdata,
+                                 UA_UInt32 requestId, const void *response);
+
+/* Don't use this function. Use the type versions below instead. */
+UA_StatusCode UA_EXPORT
+__UA_Client_AsyncService(UA_Client *client, const void *request,
+                         const UA_DataType *requestType,
+                         UA_ClientAsyncServiceCallback callback,
+                         const UA_DataType *responseType,
+                         void *userdata, UA_UInt32 *requestId);
+
+UA_StatusCode UA_EXPORT
+UA_Client_runAsync(UA_Client *client, UA_UInt16 timeout);
+
 /**
 /**
  * .. toctree::
  * .. toctree::
  *
  *

+ 28 - 44
plugins/ua_network_tcp.c

@@ -153,31 +153,7 @@ connection_recv(UA_Connection *connection, UA_ByteString *response,
         return UA_STATUSCODE_BADOUTOFMEMORY; /* not enough memory retry */
         return UA_STATUSCODE_BADOUTOFMEMORY; /* not enough memory retry */
     }
     }
 
 
-    if(timeout > 0) {
-        /* currently, only the client uses timeouts */
-#ifndef _WIN32
-        UA_UInt32 timeout_usec = timeout * 1000;
-# ifdef __APPLE__
-        struct timeval tmptv = {(long int)(timeout_usec / 1000000), timeout_usec % 1000000};
-# else
-        struct timeval tmptv = {(long int)(timeout_usec / 1000000), (long int)(timeout_usec % 1000000)};
-# endif
-        int ret = setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO,
-                             (const char *)&tmptv, sizeof(struct timeval));
-#else
-        DWORD timeout_dw = timeout;
-        int ret = setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO,
-                             (const char*)&timeout_dw, sizeof(DWORD));
-#endif
-        if(0 != ret) {
-            UA_ByteString_deleteMembers(response);
-            return UA_STATUSCODE_BADCONNECTIONCLOSED;
-        }
-    }
-
-#ifdef __CYGWIN__
-    /* Workaround for https://cygwin.com/ml/cygwin/2013-07/msg00107.html */
-    ssize_t ret;
+    /* Listen on the socket for the given timeout until a message arrives */
     if(timeout > 0) {
     if(timeout > 0) {
         fd_set fdset;
         fd_set fdset;
         FD_ZERO(&fdset);
         FD_ZERO(&fdset);
@@ -185,29 +161,24 @@ connection_recv(UA_Connection *connection, UA_ByteString *response,
         UA_UInt32 timeout_usec = timeout * 1000;
         UA_UInt32 timeout_usec = timeout * 1000;
         struct timeval tmptv = {(long int)(timeout_usec / 1000000),
         struct timeval tmptv = {(long int)(timeout_usec / 1000000),
                                 (long int)(timeout_usec % 1000000)};
                                 (long int)(timeout_usec % 1000000)};
-        int retval = select(connection->sockfd+1, &fdset, NULL, NULL, &tmptv);
-        if(retval && UA_fd_isset(connection->sockfd, &fdset)) {
-            ret = recv(connection->sockfd, (char*)response->data,
-                       connection->localConf.recvBufferSize, 0);
-        } else {
-            ret = 0;
-        }
-    } else {
-        ret = recv(connection->sockfd, (char*)response->data,
-                   connection->localConf.recvBufferSize, 0);
+        int resultsize = select(connection->sockfd+1, &fdset, NULL, NULL, &tmptv);
+
+        /* No result */
+        if(resultsize == 0)
+            return UA_STATUSCODE_GOOD;
     }
     }
-#else
+
+    /* Get the received packet(s) */
     ssize_t ret = recv(connection->sockfd, (char*)response->data,
     ssize_t ret = recv(connection->sockfd, (char*)response->data,
                        connection->localConf.recvBufferSize, 0);
                        connection->localConf.recvBufferSize, 0);
-#endif
 
 
-    /* The socket is shutdown */
+    /* The remote side closed the connection */
     if(ret == 0) {
     if(ret == 0) {
         UA_ByteString_deleteMembers(response);
         UA_ByteString_deleteMembers(response);
         return UA_STATUSCODE_BADCONNECTIONCLOSED;
         return UA_STATUSCODE_BADCONNECTIONCLOSED;
     }
     }
 
 
-    /* error case */
+    /* Error case */
     if(ret < 0) {
     if(ret < 0) {
         UA_ByteString_deleteMembers(response);
         UA_ByteString_deleteMembers(response);
         if(errno__ == INTERRUPTED || (timeout > 0) ?
         if(errno__ == INTERRUPTED || (timeout > 0) ?
@@ -217,12 +188,13 @@ connection_recv(UA_Connection *connection, UA_ByteString *response,
         return UA_STATUSCODE_BADCONNECTIONCLOSED;
         return UA_STATUSCODE_BADCONNECTIONCLOSED;
     }
     }
 
 
-    /* default case */
+    /* Set the length of the received buffer */
     response->length = (size_t)ret;
     response->length = (size_t)ret;
     return UA_STATUSCODE_GOOD;
     return UA_STATUSCODE_GOOD;
 }
 }
 
 
-static UA_StatusCode socket_set_nonblocking(SOCKET sockfd) {
+static UA_StatusCode
+socket_set_nonblocking(SOCKET sockfd) {
 #ifdef _WIN32
 #ifdef _WIN32
     u_long iMode = 1;
     u_long iMode = 1;
     if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
     if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR)
@@ -516,8 +488,8 @@ ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Server *server) {
     layer->serverSocketsSize = 0;
     layer->serverSocketsSize = 0;
 
 
     /* Close open connections */
     /* Close open connections */
-    ConnectionEntry *e, *e_tmp;
-    LIST_FOREACH_SAFE(e, &layer->connections, pointers, e_tmp)
+    ConnectionEntry *e;
+    LIST_FOREACH(e, &layer->connections, pointers)
         connection_close(&e->connection);
         connection_close(&e->connection);
 
 
     /* Run recv on client sockets. This picks up the closed sockets and frees
     /* Run recv on client sockets. This picks up the closed sockets and frees
@@ -534,7 +506,19 @@ static void
 ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
 ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
     ServerNetworkLayerTCP *layer = (ServerNetworkLayerTCP *)nl->handle;
     ServerNetworkLayerTCP *layer = (ServerNetworkLayerTCP *)nl->handle;
     UA_String_deleteMembers(&nl->discoveryUrl);
     UA_String_deleteMembers(&nl->discoveryUrl);
-    UA_free(layer);
+
+    /* Hard-close and remove remaining connections. The server is no longer
+     * running. So this is safe. */
+    ConnectionEntry *e, *e_tmp;
+    LIST_FOREACH_SAFE(e, &layer->connections, pointers, e_tmp) {
+        LIST_REMOVE(e, pointers);
+        connection_close(&e->connection);
+        CLOSESOCKET(e->connection.sockfd);
+        free(e);
+    }
+
+    /* Free the layer */
+    free(layer);
 }
 }
 
 
 UA_ServerNetworkLayer
 UA_ServerNetworkLayer

+ 192 - 74
src/client/ua_client.c

@@ -42,6 +42,15 @@ static void UA_Client_deleteMembers(UA_Client* client) {
         UA_String_deleteMembers(&client->username);
         UA_String_deleteMembers(&client->username);
     if(client->password.data)
     if(client->password.data)
         UA_String_deleteMembers(&client->password);
         UA_String_deleteMembers(&client->password);
+
+    /* Delete the async service calls */
+    AsyncServiceCall *ac, *ac_tmp;
+    LIST_FOREACH_SAFE(ac, &client->asyncServiceCalls, pointers, ac_tmp) {
+        LIST_REMOVE(ac, pointers);
+        UA_free(ac);
+    }
+
+    /* Delete the subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_Client_NotificationsAckNumber *n, *tmp;
     UA_Client_NotificationsAckNumber *n, *tmp;
     LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
     LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
@@ -654,60 +663,135 @@ UA_StatusCode UA_Client_manuallyRenewSecureChannel(UA_Client *client) {
 /* Raw Services */
 /* Raw Services */
 /****************/
 /****************/
 
 
-struct ResponseDescription {
+/* For both synchronous and asynchronous service calls */
+static UA_StatusCode
+sendServiceRequest(UA_Client *client, const void *request,
+                   const UA_DataType *requestType, UA_UInt32 *requestId) {
+    /* Make sure we have a valid session */
+    UA_StatusCode retval = UA_Client_manuallyRenewSecureChannel(client);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+
+    /* Adjusting the request header. The const attribute is violated, but we
+     * only touch the following members: */
+    UA_RequestHeader *rr = (UA_RequestHeader*)(uintptr_t)request;
+    rr->authenticationToken = client->authenticationToken; /* cleaned up at the end */
+    rr->timestamp = UA_DateTime_now();
+    rr->requestHandle = ++client->requestHandle;
+
+    /* Send the request */
+    UA_UInt32 rqId = ++client->requestId;
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Sending a request of type %i", requestType->typeId.identifier.numeric);
+    retval = UA_SecureChannel_sendBinaryMessage(&client->channel, rqId, rr, requestType);
+    UA_NodeId_init(&rr->authenticationToken);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+
+    *requestId = rqId;
+    return UA_STATUSCODE_GOOD;
+}
+
+/* Look for the async callback in the linked list, execute and delete it */
+static UA_StatusCode
+processAsyncResponse(UA_Client *client, UA_UInt32 requestId,
+                  UA_NodeId *responseTypeId, UA_ByteString *responseMessage,
+                  size_t *offset) {
+    /* Find the callback */
+    AsyncServiceCall *ac;
+    LIST_FOREACH(ac, &client->asyncServiceCalls, pointers) {
+        if(ac->requestId == requestId)
+            break;
+    }
+    if(!ac)
+        return UA_STATUSCODE_BADREQUESTHEADERINVALID;
+
+    /* Decode the response */
+    void *response = UA_alloca(ac->responseType->memSize);
+    UA_StatusCode retval = UA_decodeBinary(responseMessage, offset, response,
+                                           ac->responseType, 0, NULL);
+
+    /* Call the callback */
+    if(retval == UA_STATUSCODE_GOOD) {
+        ac->callback(client, ac->userdata, requestId, response);
+        UA_deleteMembers(response, ac->responseType);
+    } else {
+        UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                    "Could not decodee the response with Id %u", requestId);
+    }
+
+    /* Remove the callback */
+    LIST_REMOVE(ac, pointers);
+    UA_free(ac);
+    return retval;
+}
+
+/* For synchronous service calls. Execute async responses until the response
+ * with the correct requestId turns up. Then, the response is decoded into the
+ * "response" pointer. If the responseType is NULL, just process responses as
+ * async. */
+typedef struct {
     UA_Client *client;
     UA_Client *client;
-    UA_Boolean processed;
+    UA_Boolean received;
     UA_UInt32 requestId;
     UA_UInt32 requestId;
     void *response;
     void *response;
     const UA_DataType *responseType;
     const UA_DataType *responseType;
-};
+} SyncResponseDescription;
 
 
 static void
 static void
-processServiceResponse(struct ResponseDescription *rd, UA_SecureChannel *channel,
+processServiceResponse(SyncResponseDescription *rd, UA_SecureChannel *channel,
                        UA_MessageType messageType, UA_UInt32 requestId,
                        UA_MessageType messageType, UA_UInt32 requestId,
                        UA_ByteString *message) {
                        UA_ByteString *message) {
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
-    const UA_NodeId expectedNodeId =
-        UA_NODEID_NUMERIC(0, rd->responseType->binaryEncodingId);
+    UA_NodeId expectedNodeId;
     const UA_NodeId serviceFaultNodeId =
     const UA_NodeId serviceFaultNodeId =
         UA_NODEID_NUMERIC(0, UA_TYPES[UA_TYPES_SERVICEFAULT].binaryEncodingId);
         UA_NODEID_NUMERIC(0, UA_TYPES[UA_TYPES_SERVICEFAULT].binaryEncodingId);
 
 
     UA_ResponseHeader *respHeader = (UA_ResponseHeader*)rd->response;
     UA_ResponseHeader *respHeader = (UA_ResponseHeader*)rd->response;
-    rd->processed = true;
 
 
     /* Forward declaration for the goto */
     /* Forward declaration for the goto */
     size_t offset = 0;
     size_t offset = 0;
     UA_NodeId responseId;
     UA_NodeId responseId;
+    UA_NodeId_init(&responseId);
 
 
+    /* Got an error.
+     * TODO: Return as part of the response header */
     if(messageType == UA_MESSAGETYPE_ERR) {
     if(messageType == UA_MESSAGETYPE_ERR) {
         UA_TcpErrorMessage *msg = (UA_TcpErrorMessage*)message;
         UA_TcpErrorMessage *msg = (UA_TcpErrorMessage*)message;
         UA_LOG_ERROR(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
         UA_LOG_ERROR(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
                      "Server replied with an error message: %s %.*s",
                      "Server replied with an error message: %s %.*s",
-                     UA_StatusCode_name(msg->error), msg->reason.length, msg->reason.data);
+                     UA_StatusCode_name(msg->error), msg->reason.length,
+                     msg->reason.data);
         retval = msg->error;
         retval = msg->error;
         goto finish;
         goto finish;
-    } else if(messageType != UA_MESSAGETYPE_MSG) {
+    }
+
+    /* Unexpected response type.
+     * TODO: How to process valid OPN responses? */
+    if(messageType != UA_MESSAGETYPE_MSG) {
         UA_LOG_ERROR(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
         UA_LOG_ERROR(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
                      "Server replied with the wrong message type");
                      "Server replied with the wrong message type");
         retval = UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
         retval = UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
         goto finish;
         goto finish;
     }
     }
 
 
-    /* Check that the request id matches */
-    /* Todo: we need to demux async responses since a publish responses may come
-       at any time */
-    if(requestId != rd->requestId) {
-        UA_LOG_ERROR(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
-                     "Reply answers the wrong requestId. "
-                     "Async services are not yet implemented.");
-        retval = UA_STATUSCODE_BADINTERNALERROR;
+    /* Decode the data type identifier of the response */
+    retval = UA_NodeId_decodeBinary(message, &offset, &responseId);
+    if(retval != UA_STATUSCODE_GOOD)
+        goto finish;
+
+    /* Got an asynchronous response. Don't expected a synchronous response
+     * (responseType NULL) or the id does not match. */
+    if(!rd->responseType || requestId != rd->requestId) {
+        retval = processAsyncResponse(rd->client, requestId, &responseId, message, &offset);
         goto finish;
         goto finish;
     }
     }
 
 
+    /* Got the synchronous response */
+    rd->received= true;
+
     /* Check that the response type matches */
     /* Check that the response type matches */
-    retval = UA_NodeId_decodeBinary(message, &offset, &responseId);
-    if(retval != UA_STATUSCODE_GOOD)
-        goto finish;
+    expectedNodeId = UA_NODEID_NUMERIC(0, rd->responseType->binaryEncodingId);
     if(!UA_NodeId_equal(&responseId, &expectedNodeId)) {
     if(!UA_NodeId_equal(&responseId, &expectedNodeId)) {
         if(UA_NodeId_equal(&responseId, &serviceFaultNodeId)) {
         if(UA_NodeId_equal(&responseId, &serviceFaultNodeId)) {
             /* Take the statuscode from the servicefault */
             /* Take the statuscode from the servicefault */
@@ -719,7 +803,6 @@ processServiceResponse(struct ResponseDescription *rd, UA_SecureChannel *channel
                          "But retrieved ns=%i,i=%i", expectedNodeId.namespaceIndex,
                          "But retrieved ns=%i,i=%i", expectedNodeId.namespaceIndex,
                          expectedNodeId.identifier.numeric, responseId.namespaceIndex,
                          expectedNodeId.identifier.numeric, responseId.namespaceIndex,
                          responseId.identifier.numeric);
                          responseId.identifier.numeric);
-            UA_NodeId_deleteMembers(&responseId);
             retval = UA_STATUSCODE_BADINTERNALERROR;
             retval = UA_STATUSCODE_BADINTERNALERROR;
         }
         }
         goto finish;
         goto finish;
@@ -731,6 +814,7 @@ processServiceResponse(struct ResponseDescription *rd, UA_SecureChannel *channel
                              rd->client->config.customDataTypes);
                              rd->client->config.customDataTypes);
 
 
  finish:
  finish:
+    UA_NodeId_deleteMembers(&responseId);
     if(retval == UA_STATUSCODE_GOOD) {
     if(retval == UA_STATUSCODE_GOOD) {
         UA_LOG_DEBUG(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
         UA_LOG_DEBUG(rd->client->config.logger, UA_LOGCATEGORY_CLIENT,
                      "Received a response of type %i", responseId.identifier.numeric);
                      "Received a response of type %i", responseId.identifier.numeric);
@@ -743,63 +827,32 @@ processServiceResponse(struct ResponseDescription *rd, UA_SecureChannel *channel
     }
     }
 }
 }
 
 
-void
-__UA_Client_Service(UA_Client *client, const void *request, const UA_DataType *requestType,
-                    void *response, const UA_DataType *responseType) {
-    UA_init(response, responseType);
-    UA_ResponseHeader *respHeader = (UA_ResponseHeader*)response;
-
-    /* Make sure we have a valid session */
-    UA_StatusCode retval = UA_Client_manuallyRenewSecureChannel(client);
-    if(retval != UA_STATUSCODE_GOOD) {
-        respHeader->serviceResult = retval;
-        client->state = UA_CLIENTSTATE_ERRORED;
-        return;
-    }
-
-    /* Adjusting the request header. The const attribute is violated, but we
-     * only touch the following members: */
-    UA_RequestHeader *rr = (UA_RequestHeader*)(uintptr_t)request;
-    rr->authenticationToken = client->authenticationToken; /* cleaned up at the end */
-    rr->timestamp = UA_DateTime_now();
-    rr->requestHandle = ++client->requestHandle;
-
-    /* Send the request */
-    UA_UInt32 requestId = ++client->requestId;
-    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                 "Sending a request of type %i", requestType->typeId.identifier.numeric);
-    retval = UA_SecureChannel_sendBinaryMessage(&client->channel, requestId, rr, requestType);
-    if(retval != UA_STATUSCODE_GOOD) {
-        if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED)
-            respHeader->serviceResult = UA_STATUSCODE_BADREQUESTTOOLARGE;
-        else
-            respHeader->serviceResult = retval;
-        client->state = UA_CLIENTSTATE_FAULTED;
-        UA_NodeId_init(&rr->authenticationToken);
-        return;
-    }
-
+static UA_StatusCode
+receiveServiceResponse(UA_Client *client, void *response,
+                       const UA_DataType *responseType, UA_DateTime maxDate,
+                       UA_UInt32 *synchronousRequestId) {
     /* Prepare the response and the structure we give into processServiceResponse */
     /* Prepare the response and the structure we give into processServiceResponse */
-    UA_init(response, responseType);
-    struct ResponseDescription rd = {client, false, requestId, response, responseType};
+    SyncResponseDescription rd = {client, false, 0, response, responseType};
+
+    /* Return upon receiving the synchronized response. All other responses are
+     * processed with a callback "in the background". */
+    if(synchronousRequestId)
+        rd.requestId = *synchronousRequestId;
 
 
-    /* Retrieve the response */
-    UA_DateTime maxDate = UA_DateTime_nowMonotonic() + (client->config.timeout * UA_MSEC_TO_DATETIME);
     do {
     do {
         /* Retrieve complete chunks */
         /* Retrieve complete chunks */
         UA_ByteString reply = UA_BYTESTRING_NULL;
         UA_ByteString reply = UA_BYTESTRING_NULL;
         UA_Boolean realloced = false;
         UA_Boolean realloced = false;
         UA_DateTime now = UA_DateTime_nowMonotonic();
         UA_DateTime now = UA_DateTime_nowMonotonic();
-        if(now < maxDate) {
-            UA_UInt32 timeout = (UA_UInt32)((maxDate - now) / UA_MSEC_TO_DATETIME);
-            retval = UA_Connection_receiveChunksBlocking(&client->connection, &reply, &realloced, timeout);
-        } else {
-            retval = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
-        }
-        if(retval != UA_STATUSCODE_GOOD) {
-            respHeader->serviceResult = retval;
-            break;
-        }
+        if(now > maxDate)
+            return UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
+        UA_UInt32 timeout = (UA_UInt32)((maxDate - now) / UA_MSEC_TO_DATETIME);
+        UA_StatusCode retval =
+            UA_Connection_receiveChunksBlocking(&client->connection, &reply,
+                                                &realloced, timeout);
+        if(retval != UA_STATUSCODE_GOOD)
+            return retval;
+
         /* ProcessChunks and call processServiceResponse for complete messages */
         /* ProcessChunks and call processServiceResponse for complete messages */
         UA_SecureChannel_processChunks(&client->channel, &reply,
         UA_SecureChannel_processChunks(&client->channel, &reply,
                                        (UA_ProcessMessageCallback*)processServiceResponse, &rd);
                                        (UA_ProcessMessageCallback*)processServiceResponse, &rd);
@@ -808,8 +861,73 @@ __UA_Client_Service(UA_Client *client, const void *request, const UA_DataType *r
             client->connection.releaseRecvBuffer(&client->connection, &reply);
             client->connection.releaseRecvBuffer(&client->connection, &reply);
         else
         else
             UA_ByteString_deleteMembers(&reply);
             UA_ByteString_deleteMembers(&reply);
-    } while(!rd.processed);
+    } while(!rd.received);
 
 
-    /* Clean up the authentication token */
-    UA_NodeId_init(&rr->authenticationToken);
+    return UA_STATUSCODE_GOOD;
+}
+
+void
+__UA_Client_Service(UA_Client *client, const void *request,
+                    const UA_DataType *requestType, void *response,
+                    const UA_DataType *responseType) {
+    UA_init(response, responseType);
+    UA_ResponseHeader *respHeader = (UA_ResponseHeader*)response;
+
+    /* Send the request */
+    UA_UInt32 requestId;
+    UA_StatusCode retval = sendServiceRequest(client, request, requestType, &requestId);
+    if(retval != UA_STATUSCODE_GOOD) {
+        if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED)
+            respHeader->serviceResult = UA_STATUSCODE_BADREQUESTTOOLARGE;
+        else
+            respHeader->serviceResult = retval;
+        client->state = UA_CLIENTSTATE_FAULTED;
+        return;
+    }
+
+    /* Retrieve the response */
+    UA_DateTime maxDate = UA_DateTime_nowMonotonic() +
+        (client->config.timeout * UA_MSEC_TO_DATETIME);
+    retval = receiveServiceResponse(client, response, responseType, maxDate, &requestId);
+    if(retval != UA_STATUSCODE_GOOD)
+        respHeader->serviceResult = retval;
+}
+
+UA_StatusCode
+__UA_Client_AsyncService(UA_Client *client, const void *request,
+                         const UA_DataType *requestType,
+                         UA_ClientAsyncServiceCallback callback,
+                         const UA_DataType *responseType,
+                         void *userdata, UA_UInt32 *requestId) {
+    /* Prepare the entry for the linked list */
+    AsyncServiceCall *ac = (AsyncServiceCall*)UA_malloc(sizeof(AsyncServiceCall));
+    if(!ac)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    ac->callback = callback;
+    ac->responseType = responseType;
+    ac->userdata = userdata;
+
+    /* Call the service and set the requestId */
+    UA_StatusCode retval = sendServiceRequest(client, request, requestType, &ac->requestId);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_free(ac);
+        return retval;
+    }
+
+    /* Store the entry for async processing */
+    LIST_INSERT_HEAD(&client->asyncServiceCalls, ac, pointers);
+    if(requestId)
+        *requestId = ac->requestId;
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+UA_Client_runAsync(UA_Client *client, UA_UInt16 timeout) {
+    /* TODO: Call repeated jobs that are scheduled */
+    UA_DateTime maxDate = UA_DateTime_nowMonotonic() +
+        (timeout * UA_MSEC_TO_DATETIME);
+    UA_StatusCode retval = receiveServiceResponse(client, NULL, NULL, maxDate, NULL);
+    if(retval == UA_STATUSCODE_GOODNONCRITICALTIMEOUT)
+        retval = UA_STATUSCODE_GOOD;
+    return retval;
 }
 }

+ 11 - 0
src/client/ua_client_internal.h

@@ -52,6 +52,14 @@ void UA_Client_Subscriptions_forceDelete(UA_Client *client, UA_Client_Subscripti
 /* Client */
 /* Client */
 /**********/
 /**********/
 
 
+typedef struct AsyncServiceCall {
+    LIST_ENTRY(AsyncServiceCall) pointers;
+    UA_UInt32 requestId;
+    UA_ClientAsyncServiceCallback callback;
+    const UA_DataType *responseType;
+    void *userdata;
+} AsyncServiceCall;
+
 typedef enum {
 typedef enum {
     UA_CLIENTAUTHENTICATION_NONE,
     UA_CLIENTAUTHENTICATION_NONE,
     UA_CLIENTAUTHENTICATION_USERNAME
     UA_CLIENTAUTHENTICATION_USERNAME
@@ -80,6 +88,9 @@ struct UA_Client {
     UA_UserTokenPolicy token;
     UA_UserTokenPolicy token;
     UA_NodeId authenticationToken;
     UA_NodeId authenticationToken;
     UA_UInt32 requestHandle;
     UA_UInt32 requestHandle;
+
+    /* Async Service */
+    LIST_HEAD(ListOfAsyncServiceCall, AsyncServiceCall) asyncServiceCalls;
     
     
     /* Subscriptions */
     /* Subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 #ifdef UA_ENABLE_SUBSCRIPTIONS

+ 5 - 0
tests/CMakeLists.txt

@@ -180,6 +180,11 @@ add_executable(check_client check_client.c $<TARGET_OBJECTS:open62541-object> $<
 target_link_libraries(check_client ${LIBS})
 target_link_libraries(check_client ${LIBS})
 add_test(check_client ${TESTS_BINARY_DIR}/check_client)
 add_test(check_client ${TESTS_BINARY_DIR}/check_client)
 
 
+# TODO: Re-enable valgrind test
+add_executable(check_client_async check_client_async.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)
+target_link_libraries(check_client_async ${LIBS})
+add_test(check_client_async ${TESTS_BINARY_DIR}/check_client_async)
+
 # TODO: Re-enable valgrind test
 # TODO: Re-enable valgrind test
 add_executable(check_client_subscriptions check_client_subscriptions.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)
 add_executable(check_client_subscriptions check_client_subscriptions.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)
 target_link_libraries(check_client_subscriptions ${LIBS})
 target_link_libraries(check_client_subscriptions ${LIBS})

+ 114 - 0
tests/check_client_async.c

@@ -0,0 +1,114 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "ua_types.h"
+#include "ua_server.h"
+#include "ua_client.h"
+#include "ua_config_standard.h"
+#include "ua_network_tcp.h"
+#include "check.h"
+#include "testing_clock.h"
+
+UA_Server *server;
+UA_Boolean *running;
+UA_ServerNetworkLayer nl;
+pthread_t server_thread;
+
+static void * serverloop(void *_) {
+    while(*running)
+        UA_Server_run_iterate(server, true);
+    return NULL;
+}
+
+static void setup(void) {
+    running = UA_Boolean_new();
+    *running = true;
+    UA_ServerConfig config = UA_ServerConfig_standard;
+    nl = UA_ServerNetworkLayerTCP(UA_ConnectionConfig_standard, 16664);
+    config.networkLayers = &nl;
+    config.networkLayersSize = 1;
+    server = UA_Server_new(config);
+    UA_Server_run_startup(server);
+    pthread_create(&server_thread, NULL, serverloop, NULL);
+}
+
+static void teardown(void) {
+    *running = false;
+    pthread_join(server_thread, NULL);
+    UA_Server_run_shutdown(server);
+    UA_Boolean_delete(running);
+    UA_Server_delete(server);
+    nl.deleteMembers(&nl);
+}
+
+static void
+asyncReadCallback(UA_Client *client, void *userdata,
+                  UA_UInt32 requestId, const UA_ReadResponse *response) {
+    UA_UInt16 *asyncCounter = (UA_UInt16*)userdata;
+    (*asyncCounter)++;
+    UA_sleep(1000);
+}
+
+START_TEST(Client_read_async) {
+    UA_Client *client = UA_Client_new(UA_ClientConfig_standard);
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:16664");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_UInt16 asyncCounter = 0;
+
+    UA_ReadRequest rr;
+    UA_ReadRequest_init(&rr);
+
+    UA_ReadValueId rvid;
+    UA_ReadValueId_init(&rvid);
+    rvid.attributeId = UA_ATTRIBUTEID_VALUE;
+    rvid.nodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
+
+    rr.nodesToRead = &rvid;
+    rr.nodesToReadSize = 1;
+
+    /* 1st request */
+    retval = __UA_Client_AsyncService(client, &rr, &UA_TYPES[UA_TYPES_READREQUEST],
+                                      (UA_ClientAsyncServiceCallback)asyncReadCallback,
+                                      &UA_TYPES[UA_TYPES_READRESPONSE], &asyncCounter, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* 2nd request */
+    retval = __UA_Client_AsyncService(client, &rr, &UA_TYPES[UA_TYPES_READREQUEST],
+                                      (UA_ClientAsyncServiceCallback)asyncReadCallback,
+                                      &UA_TYPES[UA_TYPES_READRESPONSE], &asyncCounter, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* Process async responses during 100ms */
+    retval = UA_Client_runAsync(client, 1000);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(asyncCounter, 2);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+}
+END_TEST
+
+static Suite* testSuite_Client(void) {
+    Suite *s = suite_create("Client");
+    TCase *tc_client = tcase_create("Client Basic");
+    tcase_add_checked_fixture(tc_client, setup, teardown);
+    tcase_add_test(tc_client, Client_read_async);
+    suite_add_tcase(s,tc_client);
+    return s;
+}
+
+int main(void) {
+    Suite *s = testSuite_Client();
+    SRunner *sr = srunner_create(s);
+    srunner_set_fork_status(sr, CK_NOFORK);
+    srunner_run_all(sr,CK_NORMAL);
+    int number_failed = srunner_ntests_failed(sr);
+    srunner_free(sr);
+    return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}

+ 2 - 4
tests/check_client_highlevel.c

@@ -50,9 +50,9 @@ static void setup(void) {
 }
 }
 
 
 static void teardown(void) {
 static void teardown(void) {
-    *running = false;
     UA_Client_disconnect(client);
     UA_Client_disconnect(client);
     UA_Client_delete(client);
     UA_Client_delete(client);
+    *running = false;
     pthread_join(server_thread, NULL);
     pthread_join(server_thread, NULL);
     UA_Server_run_shutdown(server);
     UA_Server_run_shutdown(server);
     UA_Boolean_delete(running);
     UA_Boolean_delete(running);
@@ -63,17 +63,15 @@ static void teardown(void) {
 START_TEST(Misc_State) {
 START_TEST(Misc_State) {
     UA_ClientState state = UA_Client_getState(client);
     UA_ClientState state = UA_Client_getState(client);
     ck_assert_uint_eq(state, UA_CLIENTSTATE_CONNECTED);
     ck_assert_uint_eq(state, UA_CLIENTSTATE_CONNECTED);
-
 }
 }
 END_TEST
 END_TEST
 
 
 START_TEST(Misc_NamespaceGetIndex) {
 START_TEST(Misc_NamespaceGetIndex) {
     UA_UInt16 idx;
     UA_UInt16 idx;
-
     UA_String ns = UA_STRING(CUSTOM_NS);
     UA_String ns = UA_STRING(CUSTOM_NS);
     UA_StatusCode retval = UA_Client_NamespaceGetIndex(client, &ns, &idx);
     UA_StatusCode retval = UA_Client_NamespaceGetIndex(client, &ns, &idx);
-    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
 
 
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
     ck_assert_uint_eq(idx, 2);
     ck_assert_uint_eq(idx, 2);
 
 
     // namespace uri is case sensitive
     // namespace uri is case sensitive