Browse Source

refactor(server): Simplify the async operations intenally

The server config is simplified a bit. A single timeout is used for the
request. No changes to the public API otherwise.

Full integration tests for the async operations.
Julius Pfrommer 4 years ago
parent
commit
3dce9f5ed5

+ 1 - 1
CMakeLists.txt

@@ -654,7 +654,7 @@ set(internal_headers ${PROJECT_SOURCE_DIR}/deps/open62541_queue.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_server_internal.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_services.h
                      ${PROJECT_SOURCE_DIR}/src/client/ua_client_internal.h
-					 ${PROJECT_SOURCE_DIR}/src/server/ua_asyncoperation_manager.h)
+					 ${PROJECT_SOURCE_DIR}/src/server/ua_server_async.h)
 
 # TODO: make client optional
 set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c

+ 1 - 1
include/open62541/server_config.h

@@ -148,7 +148,7 @@ struct UA_ServerConfig {
 #if UA_MULTITHREADING >= 100
     UA_Double asyncOperationTimeout; /* in ms, 0 => unlimited */
     size_t maxAsyncOperationQueueSize; /* 0 => unlimited */
-    UA_Double asyncCallRequestTimeout; /* in ms, 0 => unlimited */
+    UA_Double asyncCallRequestTimeout UA_DEPRECATED; /* in ms, 0 => unlimited */
     /* Notify workers when an async operation was enqueued */
     UA_Server_AsyncOperationNotifyCallback asyncOperationNotifyCallback;
 #endif

+ 1 - 2
plugins/ua_config_default.c

@@ -226,9 +226,8 @@ setDefaultConfig(UA_ServerConfig *conf) {
 #endif
 
 #if UA_MULTITHREADING >= 100
-    conf->asyncOperationTimeout = 0;
     conf->maxAsyncOperationQueueSize = 0;
-    conf->asyncCallRequestTimeout = 120000; /* Call request Timeout in ms (2 minutes) */
+    conf->asyncOperationTimeout = 120000; /* Async Operation Timeout in ms (2 minutes) */
 #endif
 
     /* --> Finish setting the default static config <-- */

+ 0 - 126
src/server/ua_asyncoperation_manager.h

@@ -1,126 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
-  *    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
- * based on
- *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
- *    Copyright 2014, 2017 (c) Florian Palm
- *    Copyright 2015 (c) Sten Grüner
- *    Copyright 2015 (c) Oleksiy Vasylyev
- *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
- */
-
-#ifndef UA_ASYNCOPERATION_MANAGER_H_
-#define UA_ASYNCOPERATION_MANAGER_H_
-
-#include <open62541/server.h>
-
-#include "open62541_queue.h"
-#include "ua_session.h"
-#include "ua_util_internal.h"
-#include "ua_workqueue.h"
-
-_UA_BEGIN_DECLS
-
-#if UA_MULTITHREADING >= 100
-
-struct AsyncMethodQueueElement {
-    SIMPLEQ_ENTRY(AsyncMethodQueueElement) next;
-    UA_CallMethodRequest m_Request;
-    UA_CallMethodResult	m_Response;
-    UA_DateTime	m_tDispatchTime;
-    UA_UInt32	m_nRequestId;
-    UA_NodeId	m_nSessionId;
-    UA_UInt32	m_nIndex;
-};
-    
-/* Internal Helper to transfer info */
-struct AsyncMethodContextInternal {
-    UA_UInt32 nRequestId;
-    UA_NodeId nSessionId;
-    UA_UInt32 nIndex;
-    const UA_CallRequest* pRequest;
-    UA_SecureChannel* pChannel;
-};
-
-typedef struct asyncOperationEntry {
-    LIST_ENTRY(asyncOperationEntry) pointers;
-    UA_UInt32 requestId;
-    UA_NodeId sessionId;
-    UA_UInt32 requestHandle;
-    UA_DateTime	dispatchTime;       /* Creation time */
-    UA_UInt32 nCountdown;			/* Counter for open UA_CallResults */
-    UA_AsyncOperationType operationType;
-    union {
-        UA_CallResponse callResponse;
-        UA_ReadResponse readResponse;
-        UA_WriteResponse writeResponse;
-    } response;
-} asyncOperationEntry;
-
-typedef struct UA_AsyncOperationManager {
-    /* Requests / Responses */
-    LIST_HEAD(, asyncOperationEntry) asyncOperations;
-    UA_UInt32 currentCount;
-
-    /* Operations belonging to a request */
-    UA_UInt32	nMQCurSize;		/* actual size of queue */
-    UA_UInt64	nCBIdIntegrity;	/* id of callback queue check callback  */
-    UA_UInt64	nCBIdResponse;	/* id of callback check for a response  */
-
-    UA_LOCK_TYPE(ua_request_queue_lock)
-    UA_LOCK_TYPE(ua_response_queue_lock)
-    UA_LOCK_TYPE(ua_pending_list_lock)
-
-    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_request_queue;    
-    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_response_queue;
-    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_pending_list;
-} UA_AsyncOperationManager;
-
-void
-UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm, UA_Server *server);
-
-/* Deletes all entries */
-void UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm, UA_Server *server);
-
-UA_StatusCode
-UA_AsyncOperationManager_createEntry(UA_AsyncOperationManager *amm, UA_Server *server,
-                                  const UA_NodeId *sessionId, const UA_UInt32 channelId,
-                                  const UA_UInt32 requestId, const UA_UInt32 requestHandle,
-                                  const UA_AsyncOperationType operationType,
-                                  const UA_UInt32 nCountdown);
-
-/* The pointers amm and current must not be NULL */
-void
-UA_AsyncOperationManager_removeEntry(UA_AsyncOperationManager *amm, asyncOperationEntry *current);
-
-asyncOperationEntry *
-UA_AsyncOperationManager_getById(UA_AsyncOperationManager *amm, const UA_UInt32 requestId,
-                              const UA_NodeId *sessionId);
-
-void
-UA_AsyncOperationManager_checkTimeouts(UA_Server *server, UA_AsyncOperationManager *amm);
-
-/* Internal definitions for the unit tests */
-struct AsyncMethodQueueElement *
-UA_AsyncOperationManager_getAsyncMethodResult(UA_AsyncOperationManager *amm);
-void deleteMethodQueueElement(struct AsyncMethodQueueElement *pElem);
-void UA_AsyncOperationManager_addPendingMethodCall(UA_AsyncOperationManager *amm,
-                                                   struct AsyncMethodQueueElement *pElem);
-void UA_AsyncOperationManager_rmvPendingMethodCall(UA_AsyncOperationManager *amm,
-                                                   struct AsyncMethodQueueElement *pElem);
-UA_Boolean
-UA_AsyncOperationManager_isPendingMethodCall(UA_AsyncOperationManager *amm,
-                                             struct AsyncMethodQueueElement *pElem);
-UA_StatusCode
-UA_Server_SetNextAsyncMethod(UA_Server *server, const UA_UInt32 nRequestId,
-                             const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
-                             const UA_CallMethodRequest* pRequest);
-void UA_Server_CheckQueueIntegrity(UA_Server *server, void *_);
-
-#endif /* UA_MULTITHREADING >= 100 */
-
-_UA_END_DECLS
-
-#endif /* UA_ASYNCOPERATION_MANAGER_H_ */

+ 2 - 2
src/server/ua_server.c

@@ -194,7 +194,7 @@ void UA_Server_delete(UA_Server *server) {
 #endif
 
 #if UA_MULTITHREADING >= 100
-    UA_AsyncOperationManager_clear(&server->asyncMethodManager, server);
+    UA_AsyncManager_clear(&server->asyncManager, server);
 #endif
 
     /* Clean up the Admin Session */
@@ -288,7 +288,7 @@ UA_Server_init(UA_Server *server) {
     UA_SessionManager_init(&server->sessionManager, server);
 
 #if UA_MULTITHREADING >= 100
-    UA_AsyncOperationManager_init(&server->asyncMethodManager, server);
+    UA_AsyncManager_init(&server->asyncManager, server);
 #endif
 
     /* Add a regular callback for cleanup and maintenance. With a 10s interval. */

+ 259 - 458
src/server/ua_server_async.c

@@ -3,343 +3,272 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  *
  *    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
- * based on
- *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
- *    Copyright 2014, 2017 (c) Florian Palm
- *    Copyright 2015 (c) Sten Grüner
- *    Copyright 2015 (c) Oleksiy Vasylyev
- *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
+ *    Copyright 2019 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  */
 
-#include "ua_asyncoperation_manager.h"
 #include "ua_server_internal.h"
-#include "ua_subscription.h"
 
 #if UA_MULTITHREADING >= 100
 
-/*************************/
-/* AsyncOperationManager */
-/*************************/
+static void
+UA_AsyncOperation_delete(UA_AsyncOperation *ar) {
+    UA_CallMethodRequest_clear(&ar->request);
+    UA_CallMethodResult_clear(&ar->response);
+    UA_free(ar);
+}
 
-/* Checks queue element timeouts */
-void
-UA_Server_CheckQueueIntegrity(UA_Server *server, void *_) {
-    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+static UA_StatusCode
+UA_AsyncManager_sendAsyncResponse(UA_AsyncManager *am, UA_Server *server,
+                                  UA_AsyncResponse *ar) {
+    /* Get the session */
+    UA_StatusCode res = UA_STATUSCODE_GOOD;
+    UA_LOCK(server->serviceMutex);
+    UA_Session* session = UA_SessionManager_getSessionById(&server->sessionManager, &ar->sessionId);
+    UA_UNLOCK(server->serviceMutex);
+    if(!session) {
+        res = UA_STATUSCODE_BADSESSIONIDINVALID;
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_InsertMethodResponse: Session is gone");
+        goto clean_up;
+    }
 
-    /* For debugging/testing purposes */
-    if(server->config.asyncOperationTimeout <= 0.0) {
-        UA_AsyncOperationManager_checkTimeouts(server, amm);
-        return;
+    /* Check the channel */
+    UA_SecureChannel* channel = session->header.channel;
+    if(!channel) {
+        res = UA_STATUSCODE_BADSECURECHANNELCLOSED;
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_InsertMethodResponse: Channel is gone");
+        goto clean_up;
+    }
+
+    /* Okay, here we go, send the UA_CallResponse */
+    res = sendResponse(channel, ar->requestId, ar->requestHandle,
+                       (UA_ResponseHeader*)&ar->response.callResponse.responseHeader,
+                       &UA_TYPES[UA_TYPES_CALLRESPONSE]);
+    UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "UA_Server_SendResponse: Response for Req# %u sent", ar->requestId);
+
+ clean_up:
+    /* Remove from the AsyncManager */
+    UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar);
+    return res;
+}
+
+/* Integrate operation result in the AsyncResponse and send out the response if
+ * it is ready. */
+static void
+integrateOperationResult(UA_AsyncManager *am, UA_Server *server,
+                         UA_AsyncOperation *ao) {
+    /* Grab the open request, so we can continue to construct the response */
+    UA_AsyncResponse *ar = ao->parent;
+
+    /* Reduce the number of open results */
+    ar->opCountdown -= 1;
+
+    UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Return result in the server thread with %u remaining",
+                 (UA_UInt32)ar->opCountdown);
+
+    /* Move the UA_CallMethodResult to UA_CallResponse */
+    ar->response.callResponse.results[ao->index] = ao->response;
+    UA_CallMethodResult_init(&ao->response);
+
+    /* Are we done with all operations? */
+    if(ar->opCountdown == 0)
+        UA_AsyncManager_sendAsyncResponse(am, server, ar);
+}
+
+/* Process all operations in the result queue -> move content over to the
+ * AsyncResponse. This is only done by the server thread. */
+static void
+processAsyncResults(UA_Server *server, void *data) {
+    UA_AsyncManager *am = &server->asyncManager;
+    while(true) {
+        UA_LOCK(am->queueLock);
+        UA_AsyncOperation *ao = TAILQ_FIRST(&am->resultQueue);
+        if(ao)
+            TAILQ_REMOVE(&am->resultQueue, ao, pointers);
+        UA_UNLOCK(am->queueLock);
+        if(!ao)
+            break;
+        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "UA_Server_CallMethodResponse: Got Response: OKAY");
+        integrateOperationResult(am, server, ao);
+        UA_AsyncOperation_delete(ao);
+        am->opsCount--;
     }
+}
+
+/* Check if any operations have timed out */
+static void
+checkTimeouts(UA_Server *server, void *_) {
+    /* Timeouts are not configured */
+    if(server->config.asyncOperationTimeout <= 0.0)
+        return;
 
-    /* To prevent a lockup, we remove a maximum 10% of timed out entries */
-    /* on small queues, we do at least 3 */
-    size_t bMaxRemove = server->config.maxAsyncOperationQueueSize / 10;
-    if(bMaxRemove < 3)
-        bMaxRemove = 3;
-    UA_LOCK(amm->ua_request_queue_lock);
-    /* Check ifentry has been in the queue too long time */
-    while(bMaxRemove-- && !SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
-        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
-        UA_DateTime tNow = UA_DateTime_now();
-        UA_DateTime tReq = request_elem->m_tDispatchTime;
-        UA_DateTime diff = tNow - tReq;
-        /* queue entry is not older than server->nMQTimeoutSecs, so we stop checking */
-        if(diff <= (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC))
+    UA_AsyncManager *am = &server->asyncManager;
+    const UA_DateTime tNow = UA_DateTime_now();
+
+    UA_LOCK(am->queueLock);
+
+    /* Loop over the queue of dispatched ops */
+    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
+    TAILQ_FOREACH_SAFE(op, &am->dispatchedQueue, pointers, op_tmp) {
+        /* The timeout has not passed. Also for all elements following in the queue. */
+        if(tNow <= op->parent->timeout)
             break;
 
-        /* remove it from the queue */
+        /* Mark as timed out and put it into the result queue */
+        op->response.statusCode = UA_STATUSCODE_BADTIMEOUT;
+        TAILQ_REMOVE(&am->dispatchedQueue, op, pointers);
+        TAILQ_INSERT_TAIL(&am->resultQueue, op, pointers);
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_Server_CheckQueueIntegrity: Request #%u was removed due to a timeout (%f)",
-                       request_elem->m_nRequestId, server->config.asyncOperationTimeout);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
-        amm->nMQCurSize--;
-        /* Notify that we removed this request - e.g. Bad Call Response
-         * (UA_STATUSCODE_BADREQUESTTIMEOUT) */
-        UA_CallMethodResult* result = &request_elem->m_Response;
-        UA_CallMethodResult_clear(result);
-        result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
-        UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId,
-                                       &request_elem->m_nSessionId,
-                                       request_elem->m_nIndex, result);
-        UA_CallMethodResult_clear(result);
-        deleteMethodQueueElement(request_elem);
+                       "Operation was removed due to a timeout");
     }
-    UA_UNLOCK(amm->ua_request_queue_lock);
-
-    /* Clear all pending */
-    UA_LOCK(amm->ua_pending_list_lock);
-    /* Check ifentry has been in the pendig list too long time */
-    while(!SIMPLEQ_EMPTY(&amm->ua_method_pending_list)) {
-        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&amm->ua_method_pending_list);
-        UA_DateTime tNow = UA_DateTime_now();
-        UA_DateTime tReq = request_elem->m_tDispatchTime;
-        UA_DateTime diff = tNow - tReq;
-
-        /* list entry is not older than server->nMQTimeoutSecs, so we stop checking */
-        if(diff <= (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC))
+
+    /* Loop over the queue of new ops */
+    TAILQ_FOREACH_SAFE(op, &am->newQueue, pointers, op_tmp) {
+        /* The timeout has not passed. Also for all elements following in the queue. */
+        if(tNow <= op->parent->timeout)
             break;
-            
-        /* Remove it from the list */
+
+        /* Mark as timed out and put it into the result queue */
+        op->response.statusCode = UA_STATUSCODE_BADTIMEOUT;
+        TAILQ_REMOVE(&am->newQueue, op, pointers);
+        TAILQ_INSERT_TAIL(&am->resultQueue, op, pointers);
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_Server_CheckQueueIntegrity: Pending request #%u was removed "
-                       "due to a timeout (%f)", request_elem->m_nRequestId,
-                       server->config.asyncOperationTimeout);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
-        /* Notify that we removed this request - e.g. Bad Call Response
-         * (UA_STATUSCODE_BADREQUESTTIMEOUT) */
-        UA_CallMethodResult* result = &request_elem->m_Response;
-        UA_CallMethodResult_clear(result);
-        result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
-        UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId,
-                                       &request_elem->m_nSessionId,
-                                       request_elem->m_nIndex, result);
-        UA_CallMethodResult_clear(result);
-        deleteMethodQueueElement(request_elem);
+                       "Operation was removed due to a timeout");
     }
-    UA_UNLOCK(amm->ua_pending_list_lock);
 
-    /* Now we check ifwe still have pending CallRequests */
-    UA_AsyncOperationManager_checkTimeouts(server, amm);
+    UA_UNLOCK(am->queueLock);
+
+    /* Integrate async results and send out complete responses */
+    processAsyncResults(server, NULL);
 }
 
 void
-UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm, UA_Server *server) {
-    memset(amm, 0, sizeof(UA_AsyncOperationManager));
-    LIST_INIT(&amm->asyncOperations);
-
-    amm->nMQCurSize = 0;
-
-    SIMPLEQ_INIT(&amm->ua_method_request_queue);
-    SIMPLEQ_INIT(&amm->ua_method_response_queue);
-    SIMPLEQ_INIT(&amm->ua_method_pending_list);
-
-    UA_LOCK_INIT(amm->ua_request_queue_lock);
-    UA_LOCK_INIT(amm->ua_response_queue_lock);
-    UA_LOCK_INIT(amm->ua_pending_list_lock);
-
-    /* Add a regular callback for cleanup and maintenance using a 10s interval. */
-    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CheckQueueIntegrity,
-                                  NULL, 10000.0, &amm->nCBIdIntegrity);
-
-    /* Add a regular callback for for checking responmses using a 50ms interval. */
-    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CallMethodResponse,
-                                  NULL, 50.0, &amm->nCBIdResponse);
+UA_AsyncManager_init(UA_AsyncManager *am, UA_Server *server) {
+    memset(am, 0, sizeof(UA_AsyncManager));
+    TAILQ_INIT(&am->asyncResponses);
+    TAILQ_INIT(&am->newQueue);
+    TAILQ_INIT(&am->dispatchedQueue);
+    TAILQ_INIT(&am->resultQueue);
+    UA_LOCK_INIT(am->queueLock);
+
+    /* Add a regular callback for cleanup and sending finished responses at a
+     * 100s interval. */
+    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)checkTimeouts,
+                                  NULL, 100.0, &am->checkTimeoutCallbackId);
 }
 
 void
-UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm, UA_Server *server) {
-    UA_Server_removeCallback(server, amm->nCBIdResponse);
-    UA_Server_removeCallback(server, amm->nCBIdIntegrity);
-
-    /* Clean up request queue */
-    UA_LOCK(amm->ua_request_queue_lock);
-    while(!SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
-        struct AsyncMethodQueueElement* request = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
-        deleteMethodQueueElement(request);
+UA_AsyncManager_clear(UA_AsyncManager *am, UA_Server *server) {
+    UA_Server_removeCallback(server, am->checkTimeoutCallbackId);
+
+    UA_AsyncOperation *ar;
+
+    /* Clean up queues */
+    UA_LOCK(am->queueLock);
+    while((ar = TAILQ_FIRST(&am->newQueue))) {
+        TAILQ_REMOVE(&am->resultQueue, ar, pointers);
+        UA_AsyncOperation_delete(ar);
     }
-    UA_UNLOCK(amm->ua_request_queue_lock);
-
-    /* Clean up response queue */
-    UA_LOCK(amm->ua_response_queue_lock);
-    while(!SIMPLEQ_EMPTY(&amm->ua_method_response_queue)) {
-        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&amm->ua_method_response_queue);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_response_queue, next);
-        deleteMethodQueueElement(response);
+    while((ar = TAILQ_FIRST(&am->dispatchedQueue))) {
+        TAILQ_REMOVE(&am->resultQueue, ar, pointers);
+        UA_AsyncOperation_delete(ar);
     }
-    UA_UNLOCK(amm->ua_response_queue_lock);
-
-    /* Clear all pending */
-    UA_LOCK(amm->ua_pending_list_lock);
-    while(!SIMPLEQ_EMPTY(&amm->ua_method_pending_list)) {
-        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&amm->ua_method_pending_list);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
-        deleteMethodQueueElement(response);
+    while((ar = TAILQ_FIRST(&am->resultQueue))) {
+        TAILQ_REMOVE(&am->resultQueue, ar, pointers);
+        UA_AsyncOperation_delete(ar);
     }
-    UA_UNLOCK(amm->ua_pending_list_lock);
-
-    /* Delete all locks */
-    UA_LOCK_DESTROY(amm->ua_response_queue_lock);
-    UA_LOCK_DESTROY(amm->ua_request_queue_lock);
-    UA_LOCK_DESTROY(amm->ua_pending_list_lock);
+    UA_UNLOCK(am->queueLock);
 
-    asyncOperationEntry *current, *temp;
-    LIST_FOREACH_SAFE(current, &amm->asyncOperations, pointers, temp) {
-        UA_AsyncOperationManager_removeEntry(amm, current);
+    /* Remove responses */
+    UA_AsyncResponse *current, *temp;
+    TAILQ_FOREACH_SAFE(current, &am->asyncResponses, pointers, temp) {
+        UA_AsyncManager_removeAsyncResponse(am, current);
     }
-}
 
-asyncOperationEntry *
-UA_AsyncOperationManager_getById(UA_AsyncOperationManager *amm, const UA_UInt32 requestId,
-                              const UA_NodeId *sessionId) {
-    asyncOperationEntry *current = NULL;
-    LIST_FOREACH(current, &amm->asyncOperations, pointers) {
-        if(current->requestId == requestId &&
-           UA_NodeId_equal(&current->sessionId, sessionId))
-            return current;
-    }
-    return NULL;
+    /* Delete all locks */
+    UA_LOCK_DESTROY(am->queueLock);
 }
 
 UA_StatusCode
-UA_AsyncOperationManager_createEntry(UA_AsyncOperationManager *amm, UA_Server *server,
-                                  const UA_NodeId *sessionId, const UA_UInt32 channelId,
-                                  const UA_UInt32 requestId, const UA_UInt32 requestHandle,
-                                  const UA_AsyncOperationType operationType,
-                                  const UA_UInt32 nCountdown) {
-    asyncOperationEntry *newentry = (asyncOperationEntry*)
-        UA_calloc(1, sizeof(asyncOperationEntry));
-    if(!newentry) {
-        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_AsyncOperationManager_createEntry: Mem alloc failed.");
+UA_AsyncManager_createAsyncResponse(UA_AsyncManager *am, UA_Server *server,
+                                    const UA_NodeId *sessionId,
+                                    const UA_UInt32 requestId, const UA_UInt32 requestHandle,
+                                    const UA_AsyncOperationType operationType,
+                                    UA_AsyncResponse **outAr) {
+    UA_AsyncResponse *newentry = (UA_AsyncResponse*)UA_calloc(1, sizeof(UA_AsyncResponse));
+    if(!newentry)
         return UA_STATUSCODE_BADOUTOFMEMORY;
-    }
 
     UA_StatusCode res = UA_NodeId_copy(sessionId, &newentry->sessionId);
     if(res != UA_STATUSCODE_GOOD) {
-        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_AsyncOperationManager_createEntry: Mem alloc failed.");
         UA_free(newentry);
         return res;
     }
 
-    UA_atomic_addUInt32(&amm->currentCount, 1);
+    am->asyncResponsesCount += 1;
     newentry->requestId = requestId;
     newentry->requestHandle = requestHandle;
-    newentry->nCountdown = nCountdown;
-    newentry->dispatchTime = UA_DateTime_now();
-    UA_CallResponse_init(&newentry->response.callResponse);
-    newentry->response.callResponse.results = (UA_CallMethodResult*)
-        UA_calloc(nCountdown, sizeof(UA_CallMethodResult));
-    newentry->response.callResponse.resultsSize = nCountdown;
-    if(newentry->response.callResponse.results == NULL) {
-        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_AsyncOperationManager_createEntry: Mem alloc failed.");
-        UA_free(newentry);
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    }
-
-    /* Set the StatusCode to timeout by default. Will be overwritten when the
-     * result is set. */
-    for(size_t i = 0; i < nCountdown; i++)
-        newentry->response.callResponse.results[i].statusCode = UA_STATUSCODE_BADTIMEOUT;
-
-    LIST_INSERT_HEAD(&amm->asyncOperations, newentry, pointers);
-
-    UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                 "UA_AsyncOperationManager_createEntry: Chan: %u. Req# %u", channelId, requestId);
+    newentry->timeout = UA_DateTime_now();
+    if(server->config.asyncOperationTimeout > 0.0)
+        newentry->timeout += (UA_DateTime)
+            (server->config.asyncOperationTimeout * (UA_DateTime)UA_DATETIME_MSEC);
+    TAILQ_INSERT_TAIL(&am->asyncResponses, newentry, pointers);
 
+    *outAr = newentry;
     return UA_STATUSCODE_GOOD;
 }
 
 /* Remove entry and free all allocated data */
 void
-UA_AsyncOperationManager_removeEntry(UA_AsyncOperationManager *amm,
-                                  asyncOperationEntry *current) {
-    UA_assert(current);
-    LIST_REMOVE(current, pointers);
-    UA_atomic_subUInt32(&amm->currentCount, 1);
-    UA_CallResponse_clear(&current->response.callResponse);
-    UA_NodeId_clear(&current->sessionId);
-    UA_free(current);
-}
-
-/* Check if CallRequest is waiting way too long (120s) */
-void
-UA_AsyncOperationManager_checkTimeouts(UA_Server *server, UA_AsyncOperationManager *amm) {
-    asyncOperationEntry* current = NULL;
-    asyncOperationEntry* current_tmp = NULL;
-    LIST_FOREACH_SAFE(current, &amm->asyncOperations, pointers, current_tmp) {
-        UA_DateTime tNow = UA_DateTime_now();
-        UA_DateTime tReq = current->dispatchTime;
-        UA_DateTime diff = tNow - tReq;
-
-        /* The calls are all done or the timeout has not passed */
-        if (current->nCountdown == 0 || server->config.asyncCallRequestTimeout <= 0.0 ||
-            diff <= server->config.asyncCallRequestTimeout * (UA_DateTime)UA_DATETIME_MSEC)
-            continue;
-
-        /* We got an unfinished CallResponse waiting way too long for being finished.
-         * Set the remaining StatusCodes and return. */
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_AsyncOperationManager_checkTimeouts: "
-                       "RequestCall #%u was removed due to a timeout (120s)", current->requestId);
-
-        /* Get the session */
-        UA_LOCK(server->serviceMutex);
-        UA_Session* session = UA_SessionManager_getSessionById(&server->sessionManager,
-                                                               &current->sessionId);
-        UA_UNLOCK(server->serviceMutex);
-        if(!session) {
-            UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                           "UA_AsyncOperationManager_checkTimeouts: Session is gone");
-            goto remove;
-        }
-
-        /* Check the channel */
-        if(!session->header.channel) {
-            UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                           "UA_Server_InsertMethodResponse: Channel is gone");
-            goto remove;
-        }
-
-        /* Okay, here we go, send the UA_CallResponse */
-        sendResponse(session->header.channel, current->requestId, current->requestHandle,
-                     (UA_ResponseHeader*)&current->response.callResponse.responseHeader,
-                     &UA_TYPES[UA_TYPES_CALLRESPONSE]);
-        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_Server_SendResponse: Response for Req# %u sent", current->requestId);
-    remove:
-        UA_AsyncOperationManager_removeEntry(amm, current);
-    }
+UA_AsyncManager_removeAsyncResponse(UA_AsyncManager *am, UA_AsyncResponse *ar) {
+    TAILQ_REMOVE(&am->asyncResponses, ar, pointers);
+    am->asyncResponsesCount -= 1;
+    UA_CallResponse_clear(&ar->response.callResponse);
+    UA_NodeId_clear(&ar->sessionId);
+    UA_free(ar);
 }
 
-/***************/
-/* MethodQueue */
-/***************/
-
 /* Enqueue next MethodRequest */
 UA_StatusCode
-UA_Server_SetNextAsyncMethod(UA_Server *server, const UA_UInt32 nRequestId,
-                             const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
-                             const UA_CallMethodRequest *pRequest) {
-    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
-
+UA_AsyncManager_createAsyncOp(UA_AsyncManager *am, UA_Server *server,
+                              UA_AsyncResponse *ar, size_t opIndex,
+                              const UA_CallMethodRequest *opRequest) {
     if(server->config.maxAsyncOperationQueueSize != 0 &&
-        amm->nMQCurSize >= server->config.maxAsyncOperationQueueSize) {
+       am->opsCount >= server->config.maxAsyncOperationQueueSize) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                        "UA_Server_SetNextAsyncMethod: Queue exceeds limit (%d).",
                        (UA_UInt32)server->config.maxAsyncOperationQueueSize);
         return UA_STATUSCODE_BADUNEXPECTEDERROR;
     }
 
-    struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)
-        UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
-    if(!elem) {
+    UA_AsyncOperation *ao = (UA_AsyncOperation*)UA_calloc(1, sizeof(UA_AsyncOperation));
+    if(!ao) {
         UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_Server_SetNextAsyncMethod: Mem alloc failed.");
         return UA_STATUSCODE_BADOUTOFMEMORY;
     }
 
-    UA_StatusCode result = UA_CallMethodRequest_copy(pRequest, &elem->m_Request);
+    UA_StatusCode result = UA_CallMethodRequest_copy(opRequest, &ao->request);
     if(result != UA_STATUSCODE_GOOD) {
         UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_Server_SetAsyncMethodResult: UA_CallMethodRequest_copy failed.");                
-        UA_free(elem);
+        UA_free(ao);
         return result;
     }
 
-    UA_CallMethodResult_init(&elem->m_Response);
-    elem->m_nRequestId = nRequestId;
-    elem->m_nSessionId = *nSessionId;
-    elem->m_nIndex = nIndex;
-    elem->m_tDispatchTime = UA_DateTime_now();
+    UA_CallMethodResult_init(&ao->response);
+    ao->index = opIndex;
+    ao->parent = ar;
 
-    UA_LOCK(amm->ua_request_queue_lock);
-    SIMPLEQ_INSERT_TAIL(&amm->ua_method_request_queue, elem, next);
-    amm->nMQCurSize++;
-    UA_UNLOCK(amm->ua_request_queue_lock);
+    UA_LOCK(am->queueLock);
+    TAILQ_INSERT_TAIL(&am->newQueue, ao, pointers);
+    am->opsCount++;
+    ar->opCountdown++;
+    UA_UNLOCK(am->queueLock);
 
     if(server->config.asyncOperationNotifyCallback)
         server->config.asyncOperationNotifyCallback(server);
@@ -347,94 +276,27 @@ UA_Server_SetNextAsyncMethod(UA_Server *server, const UA_UInt32 nRequestId,
     return UA_STATUSCODE_GOOD;
 }
 
-/* Deep delete queue Element - only memory we did allocate */
-void
-deleteMethodQueueElement(struct AsyncMethodQueueElement *pElem) {
-    UA_CallMethodRequest_clear(&pElem->m_Request);
-    UA_CallMethodResult_clear(&pElem->m_Response);
-    UA_free(pElem);
-}
-
-void
-UA_AsyncOperationManager_addPendingMethodCall(UA_AsyncOperationManager *amm,
-                                              struct AsyncMethodQueueElement *pElem) {
-    UA_LOCK(amm->ua_pending_list_lock);
-    pElem->m_tDispatchTime = UA_DateTime_now(); /* reset timestamp for timeout */
-    SIMPLEQ_INSERT_TAIL(&amm->ua_method_pending_list, pElem, next);
-    UA_UNLOCK(amm->ua_pending_list_lock);
-}
-
-void
-UA_AsyncOperationManager_rmvPendingMethodCall(UA_AsyncOperationManager *amm,
-                                              struct AsyncMethodQueueElement *pElem) {
-    /* Remove element from pending list */
-    /* Do NOT delete it because we still need it */
-    struct AsyncMethodQueueElement* current = NULL;
-    struct AsyncMethodQueueElement* tmp_iter = NULL;
-    struct AsyncMethodQueueElement* previous = NULL;
-    UA_LOCK(amm->ua_pending_list_lock);
-    SIMPLEQ_FOREACH_SAFE(current, &amm->ua_method_pending_list, next, tmp_iter) {
-        if(pElem == current) {
-            if(previous == NULL)
-                SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
-            else
-                SIMPLEQ_REMOVE_AFTER(&amm->ua_method_pending_list, previous, next);
-            break;
-        }
-        previous = current;
-    }
-    UA_UNLOCK(amm->ua_pending_list_lock);
-    return;
-}
-
-UA_Boolean
-UA_AsyncOperationManager_isPendingMethodCall(UA_AsyncOperationManager *amm,
-                                             struct AsyncMethodQueueElement *pElem) {
-    UA_Boolean bRV = UA_FALSE;
-    struct AsyncMethodQueueElement* current = NULL;
-    struct AsyncMethodQueueElement* tmp_iter = NULL;
-    UA_LOCK(amm->ua_pending_list_lock);
-    SIMPLEQ_FOREACH_SAFE(current, &amm->ua_method_pending_list, next, tmp_iter) {
-        if(pElem == current) {
-            bRV = UA_TRUE;
-            break;
-        }
-    }
-    UA_UNLOCK(amm->ua_pending_list_lock);
-    return bRV;
-}
-
 /* Get and remove next Method Call Request */
 UA_Boolean
 UA_Server_getAsyncOperation(UA_Server *server, UA_AsyncOperationType *type,
                             const UA_AsyncOperationRequest **request,
                             void **context) {
-    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+    UA_AsyncManager *am = &server->asyncManager;
 
-    UA_Boolean bRV = UA_FALSE;
+    UA_Boolean bRV = false;
     *type = UA_ASYNCOPERATIONTYPE_INVALID;
-    struct AsyncMethodQueueElement *elem = NULL;
-    UA_LOCK(amm->ua_request_queue_lock);
-    if(!SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
-        elem = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
-        amm->nMQCurSize--;
-        if(elem) {
-            *request = (UA_AsyncOperationRequest*)&elem->m_Request;
-            *context = (void*)elem;            
-            bRV = UA_TRUE;
-        }
-        else {
-            UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                "UA_amm_GetNextAsyncMethod: elem is a NULL-Pointer.");
-        }
-    }
-    UA_UNLOCK(amm->ua_request_queue_lock);
-
-    if(bRV && elem) {
+    UA_LOCK(am->queueLock);
+    UA_AsyncOperation *ao = TAILQ_FIRST(&am->newQueue);
+    if(ao) {
+        TAILQ_REMOVE(&am->newQueue, ao, pointers);
+        TAILQ_INSERT_TAIL(&am->dispatchedQueue, ao, pointers);
         *type = UA_ASYNCOPERATIONTYPE_CALL;
-        UA_AsyncOperationManager_addPendingMethodCall(amm, elem);
+        *request = (UA_AsyncOperationRequest*)&ao->request;
+        *context = (void*)ao;
+        bRV = true;
     }
+    UA_UNLOCK(am->queueLock);
+
     return bRV;
 }
 
@@ -443,122 +305,61 @@ void
 UA_Server_setAsyncOperationResult(UA_Server *server,
                                   const UA_AsyncOperationResponse *response,
                                   void *context) {
-    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+    UA_AsyncManager *am = &server->asyncManager;
 
-    struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)context;
-    if(!elem || !UA_AsyncOperationManager_isPendingMethodCall(amm, elem) ) {
-        /* Something went wrong, late call? */
-        /* Dismiss response */
+    UA_AsyncOperation *ao = (UA_AsyncOperation*)context;
+    if(!ao) {
+        /* Something went wrong. Not a good AsyncOp. */
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-            "UA_Server_SetAsyncMethodResult: elem is a NULL-Pointer or not valid anymore.");
+                       "UA_Server_SetAsyncMethodResult: Invalid context");
         return;
     }
-    
-    /* UA_Server_RmvPendingMethodCall MUST be called outside the lock
-     * otherwise we can run into a deadlock */
-    UA_AsyncOperationManager_rmvPendingMethodCall(amm, elem);
 
-    UA_StatusCode result = UA_CallMethodResult_copy(&response->callMethodResult,
-                                                    &elem->m_Response);
-    if(result != UA_STATUSCODE_GOOD) {
-        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
-        /* Add failed CallMethodResult to response queue */
-        UA_CallMethodResult_clear(&elem->m_Response);
-        elem->m_Response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
+    UA_LOCK(am->queueLock);
+
+    /* See if the operation is still in the dispatched queue. Otherwise it has
+     * been removed due to a timeout.
+     *
+     * TODO: Add a tree-structure for the dispatch queue. The linear lookup does
+     * not scale. */
+    UA_Boolean found = false;
+    UA_AsyncOperation *op = NULL;
+    TAILQ_FOREACH(op, &am->dispatchedQueue, pointers) {
+        if(op == ao) {
+            found = true;
+            break;
+        }
     }
 
-    /* Insert response in queue */
-    UA_LOCK(amm->ua_response_queue_lock);
-    SIMPLEQ_INSERT_TAIL(&amm->ua_method_response_queue, elem, next);
-    UA_UNLOCK(amm->ua_response_queue_lock);
-}
-
-/******************/
-/* Server Methods */
-/******************/
-
-void
-UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
-                               const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
-                               const UA_CallMethodResult *response) {
-    /* Grab the open Request, so we can continue to construct the response */
-    asyncOperationEntry *data =
-        UA_AsyncOperationManager_getById(&server->asyncMethodManager, nRequestId, nSessionId);
-    if(!data) {
+    if(!found) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_Server_InsertMethodResponse: can not find UA_CallRequest/UA_CallResponse "
-                       "for Req# %u", nRequestId);
+                       "UA_Server_SetAsyncMethodResult: The operation has timed out");
+        UA_UNLOCK(am->queueLock);
         return;
     }
 
-    /* Add UA_CallMethodResult to UA_CallResponse */
-    UA_CallResponse* pResponse = &data->response.callResponse;
-    UA_CallMethodResult_copy(response, pResponse->results + nIndex);
-
-    /* Reduce the number of open results. Are we done yet with all requests? */
-    data->nCountdown -= 1;
-    if(data->nCountdown > 0)
-        return;
-    
-    /* Get the session */
-    UA_LOCK(server->serviceMutex);
-    UA_Session* session = UA_SessionManager_getSessionById(&server->sessionManager, &data->sessionId);
-    UA_UNLOCK(server->serviceMutex);
-    if(!session) {
+    /* Copy the result into the internal AsyncOperation */
+    UA_StatusCode result =
+        UA_CallMethodResult_copy(&response->callMethodResult, &ao->response);
+    if(result != UA_STATUSCODE_GOOD) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_Server_InsertMethodResponse: Session is gone");
-        UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
-        return;
+                       "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
+        ao->response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
     }
 
-    /* Check the channel */
-    UA_SecureChannel* channel = session->header.channel;
-    if(!channel) {
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "UA_Server_InsertMethodResponse: Channel is gone");
-        UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
-        return;
-    }
+    /* Move to the result queue */
+    TAILQ_REMOVE(&am->dispatchedQueue, ao, pointers);
+    TAILQ_INSERT_TAIL(&am->resultQueue, ao, pointers);
 
-    /* Okay, here we go, send the UA_CallResponse */
-    sendResponse(channel, data->requestId, data->requestHandle,
-                 (UA_ResponseHeader*)&data->response.callResponse.responseHeader,
-                 &UA_TYPES[UA_TYPES_CALLRESPONSE]);
-    UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                 "UA_Server_SendResponse: Response for Req# %u sent", data->requestId);
-    /* Remove this job from the UA_AsyncOperationManager */
-    UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
-}
+    UA_UNLOCK(am->queueLock);
 
-/* Get next Method Call Response, user has to call
- * 'UA_DeleteMethodQueueElement(...)' to cleanup memory */
-struct AsyncMethodQueueElement *
-UA_AsyncOperationManager_getAsyncMethodResult(UA_AsyncOperationManager *amm) {
-    struct AsyncMethodQueueElement *elem = NULL;
-    UA_LOCK(amm->ua_response_queue_lock);
-    if(!SIMPLEQ_EMPTY(&amm->ua_method_response_queue)) {
-        elem = SIMPLEQ_FIRST(&amm->ua_method_response_queue);
-        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_response_queue, next);
-    }
-    UA_UNLOCK(amm->ua_response_queue_lock);
-    return elem;
+    UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Set the result from the worker thread");
 }
 
-void
-UA_Server_CallMethodResponse(UA_Server *server, void* data) {
-    /* Server fetches Result from queue */
-    struct AsyncMethodQueueElement* pResponseServer = NULL;
-    while((pResponseServer = UA_AsyncOperationManager_getAsyncMethodResult(&server->asyncMethodManager))) {
-        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "UA_Server_CallMethodResponse: Got Response: OKAY");
-        UA_Server_InsertMethodResponse(server, pResponseServer->m_nRequestId,
-                                       &pResponseServer->m_nSessionId,
-                                       pResponseServer->m_nIndex,
-                                       &pResponseServer->m_Response);
-        deleteMethodQueueElement(pResponseServer);
-    }
-}
+/******************/
+/* Server Methods */
+/******************/
 
 static UA_StatusCode
 setMethodNodeAsync(UA_Server *server, UA_Session *session,
@@ -577,36 +378,36 @@ UA_Server_setMethodNodeAsync(UA_Server *server, const UA_NodeId id,
                               (UA_EditNodeCallback)setMethodNodeAsync, &isAsync);
 }
 
-/* this is a copy of the above + contest.nIndex is set :-( Any ideas for a better solution? */
 UA_StatusCode
 UA_Server_processServiceOperationsAsync(UA_Server *server, UA_Session *session,
-    UA_ServiceOperation operationCallback,
-    void *context, const size_t *requestOperations,
-    const UA_DataType *requestOperationsType,
-    size_t *responseOperations,
-    const UA_DataType *responseOperationsType) {
+                                        UA_UInt32 requestId, UA_UInt32 requestHandle,
+                                        UA_AsyncServiceOperation operationCallback,
+                                        const size_t *requestOperations,
+                                        const UA_DataType *requestOperationsType,
+                                        size_t *responseOperations,
+                                        const UA_DataType *responseOperationsType,
+                                        UA_AsyncResponse **ar) {
     size_t ops = *requestOperations;
-    if (ops == 0)
+    if(ops == 0)
         return UA_STATUSCODE_BADNOTHINGTODO;
 
-    struct AsyncMethodContextInternal* pContext = (struct AsyncMethodContextInternal*)context;
-
-    /* No padding after size_t */
+    /* Allocate the response array. No padding after size_t */
     void **respPos = (void**)((uintptr_t)responseOperations + sizeof(size_t));
     *respPos = UA_Array_new(ops, responseOperationsType);
-    if (!(*respPos))
+    if(!*respPos)
         return UA_STATUSCODE_BADOUTOFMEMORY;
-
     *responseOperations = ops;
+
+    /* Finish / dispatch the operations. This may allocate a new AsyncResponse internally */
     uintptr_t respOp = (uintptr_t)*respPos;
-    /* No padding after size_t */
     uintptr_t reqOp = *(uintptr_t*)((uintptr_t)requestOperations + sizeof(size_t));
-    for (size_t i = 0; i < ops; i++) {
-        pContext->nIndex = (UA_UInt32)i;
-        operationCallback(server, session, context, (void*)reqOp, (void*)respOp);
+    for(size_t i = 0; i < ops; i++) {
+        operationCallback(server, session, requestId, requestHandle,
+                          i, (void*)reqOp, (void*)respOp, ar);
         reqOp += requestOperationsType->memSize;
         respOp += responseOperationsType->memSize;
     }
+
     return UA_STATUSCODE_GOOD;
 }
 

+ 118 - 0
src/server/ua_server_async.h

@@ -0,0 +1,118 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+  *    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
+ * based on
+ *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2014, 2017 (c) Florian Palm
+ *    Copyright 2015 (c) Sten Grüner
+ *    Copyright 2015 (c) Oleksiy Vasylyev
+ *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
+ */
+
+#ifndef UA_SERVER_ASYNC_H_
+#define UA_SERVER_ASYNC_H_
+
+#include <open62541/server.h>
+
+#include "open62541_queue.h"
+#include "ua_util_internal.h"
+
+_UA_BEGIN_DECLS
+
+#if UA_MULTITHREADING >= 100
+
+struct UA_AsyncResponse;
+typedef struct UA_AsyncResponse UA_AsyncResponse;
+
+/* A single operation (of a larger request) */
+typedef struct UA_AsyncOperation {
+    TAILQ_ENTRY(UA_AsyncOperation) pointers;
+    UA_CallMethodRequest request;
+    UA_CallMethodResult	response;
+    size_t index;             /* Index of the operation in the array of ops in
+                               * request/response */
+    UA_AsyncResponse *parent; /* Always non-NULL. The parent is only removed
+                               * when its operations are removed */
+} UA_AsyncOperation;
+
+struct UA_AsyncResponse {
+    TAILQ_ENTRY(UA_AsyncResponse) pointers; /* Insert new at the end */
+    UA_UInt32 requestId;
+    UA_NodeId sessionId;
+    UA_UInt32 requestHandle;
+    UA_DateTime	timeout;
+    UA_AsyncOperationType operationType;
+    union {
+        UA_CallResponse callResponse;
+        UA_ReadResponse readResponse;
+        UA_WriteResponse writeResponse;
+    } response;
+    UA_UInt32 opCountdown; /* Counter for outstanding operations. The AR can
+                            * only be deleted when all have returned. */
+};
+
+typedef TAILQ_HEAD(UA_AsyncOperationQueue, UA_AsyncOperation) UA_AsyncOperationQueue;
+
+typedef struct {
+    /* Requests / Responses */
+    TAILQ_HEAD(, UA_AsyncResponse) asyncResponses;
+    size_t asyncResponsesCount;
+
+    /* Operations for the workers. The queues are all FIFO: Put in at the tail,
+     * take out at the head.*/
+    UA_LOCK_TYPE(queueLock)
+    UA_AsyncOperationQueue newQueue;        /* New operations for the workers */    
+    UA_AsyncOperationQueue dispatchedQueue; /* Operations taken by a worker. When a result is
+                                             * returned, we search for the op here to see if it
+                                             * is still "alive" (not timed out). */
+    UA_AsyncOperationQueue resultQueue;     /* Results to be integrated */
+    size_t opsCount; /* How many operations are transient (in one of the three queues)? */
+
+    UA_UInt64 checkTimeoutCallbackId; /* Registered repeated callbacks */
+} UA_AsyncManager;
+
+void UA_AsyncManager_init(UA_AsyncManager *am, UA_Server *server);
+void UA_AsyncManager_clear(UA_AsyncManager *am, UA_Server *server);
+
+UA_StatusCode
+UA_AsyncManager_createAsyncResponse(UA_AsyncManager *am, UA_Server *server,
+                                    const UA_NodeId *sessionId,
+                                    const UA_UInt32 requestId,
+                                    const UA_UInt32 requestHandle,
+                                    const UA_AsyncOperationType operationType,
+                                    UA_AsyncResponse **outAr);
+
+/* Only remove the AsyncResponse when the operation count is zero */
+void
+UA_AsyncManager_removeAsyncResponse(UA_AsyncManager *am, UA_AsyncResponse *ar);
+
+UA_StatusCode
+UA_AsyncManager_createAsyncOp(UA_AsyncManager *am, UA_Server *server,
+                              UA_AsyncResponse *ar, size_t opIndex,
+                              const UA_CallMethodRequest *opRequest);
+
+typedef void (*UA_AsyncServiceOperation)(UA_Server *server, UA_Session *session,
+                                         UA_UInt32 requestId, UA_UInt32 requestHandle,
+                                         size_t opIndex, const void *requestOperation,
+                                         void *responseOperation, UA_AsyncResponse **ar);
+
+/* Creates an AsyncResponse in-situ when an async operation is encountered. If
+ * that is the case, the sync responses are moved to the AsyncResponse. */
+UA_StatusCode
+UA_Server_processServiceOperationsAsync(UA_Server *server, UA_Session *session,
+                                        UA_UInt32 requestId, UA_UInt32 requestHandle,
+                                        UA_AsyncServiceOperation operationCallback,
+                                        const size_t *requestOperations,
+                                        const UA_DataType *requestOperationsType,
+                                        size_t *responseOperations,
+                                        const UA_DataType *responseOperationsType,
+                                        UA_AsyncResponse **ar)
+UA_FUNC_ATTR_WARN_UNUSED_RESULT;
+
+#endif /* UA_MULTITHREADING >= 100 */
+
+_UA_END_DECLS
+
+#endif /* UA_SERVER_ASYNC_H_ */

+ 14 - 17
src/server/ua_server_binary.c

@@ -550,24 +550,21 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
 #endif
 
 #if UA_MULTITHREADING >= 100
-    /* If marked as async, the call request is not answered immediately, unless
-     * there is an error */
+    /* The call request might not be answered immediately */
     if(requestType == &UA_TYPES[UA_TYPES_CALLREQUEST]) {
-        responseHeader->serviceResult =
-            UA_AsyncOperationManager_createEntry(&server->asyncMethodManager, server,
-                                              &session->sessionId, channel->securityToken.channelId,
-                                              requestId, requestHeader->requestHandle,
-                                              UA_ASYNCOPERATIONTYPE_CALL,
-                                              (UA_UInt32)((const UA_CallRequest*)requestHeader)->methodsToCallSize);
-        if(responseHeader->serviceResult == UA_STATUSCODE_GOOD)
-            Service_CallAsync(server, session, channel, requestId,
-                              (const UA_CallRequest*)requestHeader, (UA_CallResponse*)responseHeader);
-
-        /* We got an error, so send response directly */
-        if(responseHeader->serviceResult != UA_STATUSCODE_GOOD)
-            return sendResponse(channel, requestId, requestHeader->requestHandle,
-                                responseHeader, responseType);
-        return UA_STATUSCODE_GOOD;
+        UA_Boolean finished = true;
+        UA_LOCK(server->serviceMutex);
+        Service_CallAsync(server, session, requestId, (const UA_CallRequest*)requestHeader,
+                          (UA_CallResponse*)responseHeader, &finished);
+        UA_UNLOCK(server->serviceMutex);
+
+        /* Async method calls remain. Don't send a response now */
+        if(!finished)
+            return UA_STATUSCODE_GOOD;
+
+        /* We are done here */
+        return sendResponse(channel, requestId, requestHeader->requestHandle,
+                            responseHeader, responseType);
     }
 #endif
 

+ 2 - 23
src/server/ua_server_internal.h

@@ -23,7 +23,7 @@
 #include "ua_connection_internal.h"
 #include "ua_securechannel_manager.h"
 #include "ua_session_manager.h"
-#include "ua_asyncoperation_manager.h"
+#include "ua_server_async.h"
 #include "ua_timer.h"
 #include "ua_util_internal.h"
 #include "ua_workqueue.h"
@@ -75,7 +75,7 @@ struct UA_Server {
     UA_SecureChannelManager secureChannelManager;
     UA_SessionManager sessionManager;
 #if UA_MULTITHREADING >= 100
-    UA_AsyncOperationManager asyncMethodManager;
+    UA_AsyncManager asyncManager;
 #endif
     UA_Session adminSession; /* Local access to the services (for startup and
                               * maintenance) uses this Session with all possible
@@ -192,17 +192,6 @@ UA_StatusCode
 writeWithSession(UA_Server *server, UA_Session *session,
                  const UA_WriteValue *value);
 
-#if UA_MULTITHREADING >= 100
-
-void
-UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
-                               const UA_NodeId* nSessionId, const UA_UInt32 nIndex,
-                               const UA_CallMethodResult* response);
-void
-UA_Server_CallMethodResponse(UA_Server *server, void* data);
-
-#endif
-
 UA_StatusCode
 sendResponse(UA_SecureChannel *channel, UA_UInt32 requestId, UA_UInt32 requestHandle,
              UA_ResponseHeader *responseHeader, const UA_DataType *responseType);
@@ -224,16 +213,6 @@ UA_Server_processServiceOperations(UA_Server *server, UA_Session *session,
                                    const UA_DataType *responseOperationsType)
     UA_FUNC_ATTR_WARN_UNUSED_RESULT;
 
-UA_StatusCode
-UA_Server_processServiceOperationsAsync(UA_Server *server, UA_Session *session,
-                                        UA_ServiceOperation operationCallback,
-                                        void *context,
-                                        const size_t *requestOperations,
-                                        const UA_DataType *requestOperationsType,
-                                        size_t *responseOperations,
-                                        const UA_DataType *responseOperationsType)
-UA_FUNC_ATTR_WARN_UNUSED_RESULT;
-
 /******************************************/
 /* Internal function calls, without locks */
 /******************************************/

+ 3 - 3
src/server/ua_services.h

@@ -358,9 +358,9 @@ void Service_Call(UA_Server *server, UA_Session *session,
                   UA_CallResponse *response);
 
 # if UA_MULTITHREADING >= 100
-void Service_CallAsync(UA_Server *server, UA_Session *session, UA_SecureChannel* channel,
-                       UA_UInt32 requestId, const UA_CallRequest *request,
-                       UA_CallResponse *response);
+void Service_CallAsync(UA_Server *server, UA_Session *session, UA_UInt32 requestId,
+                       const UA_CallRequest *request, UA_CallResponse *response,
+                       UA_Boolean *finished);
 #endif
 #endif
 

+ 56 - 40
src/server/ua_services_method.c

@@ -232,58 +232,61 @@ callWithMethodAndObject(UA_Server *server, UA_Session *session,
 #if UA_MULTITHREADING >= 100
 
 static void
-Operation_CallMethodAsync(UA_Server *server, UA_Session *session, void *context,
-    const UA_CallMethodRequest *request, UA_CallMethodResult *result) {
-    struct AsyncMethodContextInternal *pContext = (struct AsyncMethodContextInternal*)context;
-
+Operation_CallMethodAsync(UA_Server *server, UA_Session *session, UA_UInt32 requestId,
+                          UA_UInt32 requestHandle, size_t opIndex,
+                          UA_CallMethodRequest *opRequest, UA_CallMethodResult *opResult,
+                          UA_AsyncResponse **ar) {
     /* Get the method node */
     const UA_MethodNode *method = (const UA_MethodNode*)
-        UA_NODESTORE_GET(server, &request->methodId);
+        UA_NODESTORE_GET(server, &opRequest->methodId);
     if(!method) {
-        result->statusCode = UA_STATUSCODE_BADNODEIDUNKNOWN;
+        opResult->statusCode = UA_STATUSCODE_BADNODEIDUNKNOWN;
         return;
     }
 
     /* Get the object node */
     const UA_ObjectNode *object = (const UA_ObjectNode*)
-        UA_NODESTORE_GET(server, &request->objectId);
+        UA_NODESTORE_GET(server, &opRequest->objectId);
     if(!object) {
-        result->statusCode = UA_STATUSCODE_BADNODEIDUNKNOWN;
+        opResult->statusCode = UA_STATUSCODE_BADNODEIDUNKNOWN;
         UA_NODESTORE_RELEASE(server, (const UA_Node*)method);
         return;
     }
 
-    if(method->async) {
-        /* Async case */        
-        UA_StatusCode res = UA_Server_SetNextAsyncMethod(server, pContext->nRequestId,
-                                                         &pContext->nSessionId, pContext->nIndex, request);
-        if(res != UA_STATUSCODE_GOOD) {
-            UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                "Operation_CallMethodAsync: Adding request to queue: FAILED");
-            /* Set this Request as failed */
-            UA_CallMethodResult_clear(result);
-            result->statusCode = res;
-            UA_Server_InsertMethodResponse(server, pContext->nRequestId,
-                                           &pContext->nSessionId, pContext->nIndex, result);
-            UA_CallMethodResult_clear(result);
-        }
+    /* Synchronous execution */
+    if(!method->async) {
+        callWithMethodAndObject(server, session, opRequest, opResult, method, object);
+        goto cleanup;
     }
-    else {
-        /* Sync execution case, continue with method and object as context */
-        callWithMethodAndObject(server, session, request, result, method, object);
-        UA_Server_InsertMethodResponse(server, pContext->nRequestId,
-                                       &pContext->nSessionId, pContext->nIndex, result);
-        UA_CallMethodResult_clear(result);
+
+    /* <-- Async method call --> */
+
+    /* No AsyncResponse allocated so far */
+    if(!*ar) {
+        opResult->statusCode =
+            UA_AsyncManager_createAsyncResponse(&server->asyncManager, server,
+                                                &session->sessionId, requestId,
+                                                requestHandle, UA_ASYNCOPERATIONTYPE_CALL,
+                                                ar);
+        if(opResult->statusCode != UA_STATUSCODE_GOOD)
+            goto cleanup;
     }
 
+    /* Create the Async Request to be taken by workers */
+    opResult->statusCode =
+        UA_AsyncManager_createAsyncOp(&server->asyncManager,
+                                      server, *ar, opIndex, opRequest);
+
+ cleanup:
     /* Release the method and object node */
     UA_NODESTORE_RELEASE(server, (const UA_Node*)method);
     UA_NODESTORE_RELEASE(server, (const UA_Node*)object);
 }
 
-void Service_CallAsync(UA_Server *server, UA_Session *session,
-                       UA_SecureChannel* channel, UA_UInt32 requestId,
-                       const UA_CallRequest *request, UA_CallResponse *response) {
+void
+Service_CallAsync(UA_Server *server, UA_Session *session, UA_UInt32 requestId,
+                  const UA_CallRequest *request, UA_CallResponse *response,
+                  UA_Boolean *finished) {
     UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing CallRequestAsync");
     if(server->config.maxNodesPerMethodCall != 0 &&
         request->methodsToCallSize > server->config.maxNodesPerMethodCall) {
@@ -291,16 +294,29 @@ void Service_CallAsync(UA_Server *server, UA_Session *session,
         return;
     }
 
-    struct AsyncMethodContextInternal context;
-    context.nRequestId = requestId;
-    context.nSessionId = session->sessionId;
-    context.pRequest = request;
-    context.pChannel = (UA_SecureChannel*)channel;
+    UA_AsyncResponse *ar = NULL;
     response->responseHeader.serviceResult =
-        UA_Server_processServiceOperationsAsync(server, session,
-                                                (UA_ServiceOperation)Operation_CallMethodAsync, &context,
-            &request->methodsToCallSize, &UA_TYPES[UA_TYPES_CALLMETHODREQUEST],
-            &response->resultsSize, &UA_TYPES[UA_TYPES_CALLMETHODRESULT]);
+        UA_Server_processServiceOperationsAsync(server, session, requestId,
+                                                request->requestHeader.requestHandle,
+                                                (UA_AsyncServiceOperation)Operation_CallMethodAsync,
+                                                &request->methodsToCallSize,
+                                                &UA_TYPES[UA_TYPES_CALLMETHODREQUEST],
+                                                &response->resultsSize,
+                                                &UA_TYPES[UA_TYPES_CALLMETHODRESULT], &ar);
+
+    if(ar) {
+        if(ar->opCountdown > 0) {
+            /* Move all results to the AsyncResponse. The async operation results
+             * will be overwritten when the workers return results. */
+            ar->response.callResponse = *response;
+            UA_CallResponse_init(response);
+            *finished = false;
+        } else {
+            /* If there is a new AsyncResponse, ensure it has at least one pending
+             * operation */
+            UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar);
+        }
+    }
 }
 #endif
 

+ 0 - 2
tests/client/check_client_async.c

@@ -19,9 +19,7 @@
 #include "thread_wrapper.h"
 
 UA_Server *server;
-UA_ServerConfig *config;
 UA_Boolean running;
-UA_ServerNetworkLayer nl;
 THREAD_HANDLE server_thread;
 
 THREAD_CALLBACK(serverloop) {

+ 226 - 159
tests/server/check_server_asyncop.c

@@ -5,190 +5,257 @@
    not open a TCP port. */
 
 #include <open62541/server_config_default.h>
-
-#include "server/ua_services.h"
-#include "ua_server_internal.h"
-
-#include <check.h>
-#include <time.h>
-#include <signal.h>
-#include <stdlib.h>
-
-#include <open62541/plugin/log_stdout.h>
 #include <open62541/server.h>
+#include <open62541/client.h>
+#include <open62541/client_config_default.h>
+#include <open62541/client_highlevel_async.h>
+#include <open62541/plugin/log_stdout.h>
 #include "testing_clock.h"
+#include "thread_wrapper.h"
 
-static UA_Server* globalServer;
-static UA_Session session;
+#include <check.h>
 
-START_TEST(InternalTestingQueue) {
-    globalServer->config.asyncOperationTimeout = 2;
-    globalServer->config.maxAsyncOperationQueueSize = 5;
-    UA_UInt32 reqId = 1;
+UA_Boolean running;
+THREAD_HANDLE server_thread;
+static UA_Server *server;
+static size_t clientCounter;
 
-    const UA_AsyncOperationRequest* pRequest = NULL;
-    void *pContext = NULL;
-    UA_AsyncOperationType type;
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: fetch from empty queue");
-    bool rv = UA_Server_getAsyncOperation(globalServer, &type, &pRequest, &pContext);
-    ck_assert_int_eq(rv, UA_FALSE);
-    
-    UA_CallMethodRequest* pRequest1 = UA_CallMethodRequest_new();
-    UA_CallMethodRequest_init(pRequest1);
-    UA_NodeId id = UA_NODEID_NUMERIC(1, 62540);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: create request queue entry");
-    UA_StatusCode result = UA_Server_SetNextAsyncMethod(globalServer, reqId++, &id, 0, pRequest1);
-    ck_assert_int_eq(result, UA_STATUSCODE_GOOD);
-    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 1);
-
-    const UA_AsyncOperationRequest* pRequestWorker = NULL;
-    void *pContextWorker = NULL;
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: fetch from queue");
-    rv = UA_Server_getAsyncOperation(globalServer, &type, &pRequestWorker, &pContextWorker);
-    ck_assert_int_eq(rv, UA_TRUE);
-    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 0);
-        
-    UA_AsyncOperationResponse pResponse;
-    UA_CallMethodResult_init(&pResponse.callMethodResult);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: set result to response queue");
-    UA_Server_setAsyncOperationResult(globalServer, &pResponse, pContextWorker);
-    UA_CallMethodResult_deleteMembers(&pResponse.callMethodResult);
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: fetch result from response queue");
-    struct AsyncMethodQueueElement* pResponseServer =
-        UA_Server_GetAsyncMethodResult(&globalServer->asyncMethodManager);
-    ck_assert_ptr_ne(pResponseServer, NULL);
-    UA_Server_DeleteMethodQueueElement(globalServer, pResponseServer);
-    
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: testing queue limit (%zu)", globalServer->config.maxAsyncOperationQueueSize);
-    UA_UInt32 i;
-    for (i = 0; i < globalServer->config.maxAsyncOperationQueueSize + 1; i++) {
-        UA_NodeId idTmp = UA_NODEID_NUMERIC(1, 62541);
-        UA_StatusCode resultTmp = UA_Server_SetNextAsyncMethod(globalServer, reqId++, &idTmp, 0, pRequest1);
-        if (i < globalServer->config.maxAsyncOperationQueueSize)
-            ck_assert_int_eq(resultTmp, UA_STATUSCODE_GOOD);
-        else
-            ck_assert_int_ne(resultTmp, UA_STATUSCODE_GOOD);
-    }
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: queue should not be empty");
-    UA_Server_CheckQueueIntegrity(globalServer,NULL);
-    ck_assert_int_ne(globalServer->asyncMethodManager.nMQCurSize, 0);
-    UA_fakeSleep((UA_Int32)(globalServer->config.asyncOperationTimeout + 1) * 1000);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: empty queue caused by queue timeout (%ds)", (UA_Int32)globalServer->config.asyncOperationTimeout);
-    /* has to be done twice due to internal queue delete limit */
-    UA_Server_CheckQueueIntegrity(globalServer,NULL);
-    UA_Server_CheckQueueIntegrity(globalServer,NULL);
-    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 0);    
-    
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: adding one new entry to empty queue");
-    result = UA_Server_SetNextAsyncMethod(globalServer, reqId++, &id, 0, pRequest1);
-    ck_assert_int_eq(result, UA_STATUSCODE_GOOD);    
-    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 1);
-    UA_CallMethodRequest_delete(pRequest1);
+static UA_StatusCode
+methodCallback(UA_Server *serverArg,
+         const UA_NodeId *sessionId, void *sessionHandle,
+         const UA_NodeId *methodId, void *methodContext,
+         const UA_NodeId *objectId, void *objectContext,
+         size_t inputSize, const UA_Variant *input,
+         size_t outputSize, UA_Variant *output) {
+    return UA_STATUSCODE_GOOD;
 }
-END_TEST
-
-
-START_TEST(InternalTestingManager) {    
-    UA_Session_init(&session);
-    session.sessionId = UA_NODEID_NUMERIC(1, 62541);
-    UA_SecureChannel channel;
-    UA_SecureChannel_init(&channel);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking UA_AsyncOperationManager_createEntry: create CallRequests");
-    for (UA_Int32 i = 1; i < 7; i++) {
-        UA_StatusCode result =
-            UA_AsyncOperationManager_createEntry(&globalServer->asyncMethodManager, globalServer,
-                                              &session.sessionId, channel.securityToken.channelId,
-                                              i, i, UA_ASYNCOPERATIONTYPE_CALL, 1);
-        ck_assert_int_eq(result, UA_STATUSCODE_GOOD);
-    }
-    UA_fakeSleep(121000);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking UA_AsyncOperationManager_createEntry: empty CallRequest list");
-    UA_AsyncOperationManager_checkTimeouts(globalServer, &globalServer->asyncMethodManager);
-    ck_assert_int_eq(globalServer->asyncMethodManager.currentCount, 0);
+
+static void
+clientReceiveCallback(UA_Client *client, void *userdata,
+                      UA_UInt32 requestId, UA_CallResponse *cr) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received call response");
+    clientCounter++;
 }
-END_TEST
-
-START_TEST(InternalTestingPendingList) {
-    globalServer->config.asyncOperationTimeout = 2;
-    globalServer->config.maxAsyncOperationQueueSize = 5;
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList");
-
-    struct AsyncMethodQueueElement* elem1 = (struct AsyncMethodQueueElement*)UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
-    struct AsyncMethodQueueElement* elem2 = (struct AsyncMethodQueueElement*)UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
-    struct AsyncMethodQueueElement* elem3 = (struct AsyncMethodQueueElement*)UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: Adding 3 elements");
-    UA_Server_AddPendingMethodCall(globalServer, elem1);
-    UA_Server_AddPendingMethodCall(globalServer, elem2);
-    UA_Server_AddPendingMethodCall(globalServer, elem3);
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: check if element is found");
-    UA_Boolean bFound = UA_Server_IsPendingMethodCall(globalServer, elem2);
-    if (!bFound) {
-        ck_assert_int_eq(bFound, UA_TRUE);
-        UA_Server_RmvPendingMethodCall(globalServer, elem2);
-    }
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: remove remaining elements");
-    UA_Server_RmvPendingMethodCall(globalServer, elem1);
-    UA_Server_RmvPendingMethodCall(globalServer, elem3);
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: check if removed element is NOT found");
-    bFound = UA_Server_IsPendingMethodCall(globalServer, elem1);
-    if (!bFound) {
-        ck_assert_int_eq(bFound, UA_FALSE);
-        UA_Server_RmvPendingMethodCall(globalServer, elem1);
-    }
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: queue integrity");
-    UA_Server_AddPendingMethodCall(globalServer, elem1);
-    UA_Server_AddPendingMethodCall(globalServer, elem2);
-    UA_Server_AddPendingMethodCall(globalServer, elem3);
-    UA_fakeSleep((UA_Int32)(globalServer->config.asyncOperationTimeout + 1) * 1000);
-    UA_Server_CheckQueueIntegrity(globalServer, NULL);
-    ck_assert_ptr_eq(globalServer->asyncMethodManager.ua_method_pending_list.sqh_first,NULL);    
-
-    UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: global removal/delete");
-    UA_Server_AddPendingMethodCall(globalServer, elem1);
+
+THREAD_CALLBACK(serverloop) {
+    while(running)
+        UA_Server_run_iterate(server, true);
+    return 0;
 }
-END_TEST
 
 static void setup(void) {
-    globalServer = UA_Server_new();
-    UA_ServerConfig *config = UA_Server_getConfig(globalServer);
+    clientCounter = 0;
+    running = true;
+    server = UA_Server_new();
+    UA_ServerConfig *config = UA_Server_getConfig(server);
     UA_ServerConfig_setDefault(config);
+    config->asyncOperationTimeout = 2000.0; /* 2 seconds */
+
+    UA_MethodAttributes methodAttr = UA_MethodAttributes_default;
+    methodAttr.executable = true;
+    methodAttr.userExecutable = true;
+
+    /* Synchronous Method */
+    UA_StatusCode res =
+        UA_Server_addMethodNode(server, UA_NODEID_STRING(1, "method"),
+                            UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                            UA_NODEID_NUMERIC(0, UA_NS0ID_HASORDEREDCOMPONENT),
+                            UA_QUALIFIEDNAME(1, "method"),
+                            methodAttr, &methodCallback,
+                            0, NULL, 0, NULL, NULL, NULL);
+    ck_assert_uint_eq(res, UA_STATUSCODE_GOOD);
+
+    /* Asynchronous Method */
+    res = UA_Server_addMethodNode(server, UA_NODEID_STRING(1, "asyncMethod"),
+                            UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                            UA_NODEID_NUMERIC(0, UA_NS0ID_HASORDEREDCOMPONENT),
+                            UA_QUALIFIEDNAME(1, "asyncMethod"),
+                            methodAttr, &methodCallback,
+                            0, NULL, 0, NULL, NULL, NULL);
+    ck_assert_uint_eq(res, UA_STATUSCODE_GOOD);
+    res = UA_Server_setMethodNodeAsync(server, UA_NODEID_STRING(1, "asyncMethod"), true);
+    ck_assert_uint_eq(res, UA_STATUSCODE_GOOD);
+
+    UA_Server_run_startup(server);
+    THREAD_CREATE(server_thread, serverloop);
 }
 
 static void teardown(void) {    
-    UA_Server_delete(globalServer);
+    running = false;
+    THREAD_JOIN(server_thread);
+    UA_Server_run_shutdown(server);
+    UA_Server_delete(server);
 }
 
+START_TEST(Async_call) {
+    UA_Client *client = UA_Client_new();
+    UA_ClientConfig *clientConfig = UA_Client_getConfig(client);
+    UA_ClientConfig_setDefault(clientConfig);
+
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* Stop the server thread. Iterate manually from now on */
+    running = false;
+    THREAD_JOIN(server_thread);
+
+    /* Call async method, then the sync method.
+     * The sync method returns first. */
+    retval = UA_Client_call_async(client,
+                                  UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                                  UA_NODEID_STRING(1, "asyncMethod"),
+                                  0, NULL, clientReceiveCallback, NULL, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    retval = UA_Client_call_async(client,
+                                  UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                                  UA_NODEID_STRING(1, "method"),
+                                  0, NULL, clientReceiveCallback, NULL, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* Receive the answer of the sync call */
+    ck_assert_uint_eq(clientCounter, 0);
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 1);
+
+    /* Process the async method call for the server */
+    UA_AsyncOperationType aot;
+    const UA_AsyncOperationRequest *request;
+    void *context;
+    UA_Boolean haveAsync = UA_Server_getAsyncOperation(server, &aot, &request, &context);
+    ck_assert_uint_eq(haveAsync, true);
+    UA_AsyncOperationResponse response;
+    UA_CallMethodResult_init(&response.callMethodResult);
+    UA_Server_setAsyncOperationResult(server, &response, context);
+
+    /* Iterate and pick up the async response to be sent out */
+    UA_fakeSleep(1000);
+    UA_Server_run_iterate(server, true);
+
+    UA_fakeSleep(1000);
+    UA_Server_run_iterate(server, true);
+
+    /* Process async responses during 1s */
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 2);
+
+    running = true;
+    THREAD_CREATE(server_thread, serverloop);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+} END_TEST
+
+START_TEST(Async_timeout) {
+    UA_Client *client = UA_Client_new();
+    UA_ClientConfig *clientConfig = UA_Client_getConfig(client);
+    UA_ClientConfig_setDefault(clientConfig);
+
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* Stop the server thread. Iterate manually from now on */
+    running = false;
+    THREAD_JOIN(server_thread);
+
+    /* Call async method, then the sync method.
+     * The sync method returns first. */
+    retval = UA_Client_call_async(client,
+                                  UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                                  UA_NODEID_STRING(1, "asyncMethod"),
+                                  0, NULL, clientReceiveCallback, NULL, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* We expect to receive the timeout not yet*/
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 0);
+
+    UA_fakeSleep(1000 * 1.5);
+
+    /* We expect to receive the timeout not yet*/
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 0);
+
+    UA_fakeSleep(1000);
+
+    /* We expect to receive the timeout response */
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 1);
+
+    running = true;
+    THREAD_CREATE(server_thread, serverloop);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+} END_TEST
+
+/* Force a timeout when the operation is checked out with the worker */
+START_TEST(Async_timeout_worker) {
+    UA_Client *client = UA_Client_new();
+    UA_ClientConfig *clientConfig = UA_Client_getConfig(client);
+    UA_ClientConfig_setDefault(clientConfig);
+
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    /* Stop the server thread. Iterate manually from now on */
+    running = false;
+    THREAD_JOIN(server_thread);
+
+    /* Call async method, then the sync method.
+     * The sync method returns first. */
+    retval = UA_Client_call_async(client,
+                                  UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                                  UA_NODEID_STRING(1, "asyncMethod"),
+                                  0, NULL, clientReceiveCallback, NULL, NULL);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    UA_Server_run_iterate(server, true);
+
+    /* Process the async method call for the server */
+    UA_AsyncOperationType aot;
+    const UA_AsyncOperationRequest *request;
+    void *context;
+    UA_Boolean haveAsync = UA_Server_getAsyncOperation(server, &aot, &request, &context);
+    ck_assert_uint_eq(haveAsync, true);
+    UA_AsyncOperationResponse response;
+    UA_CallMethodResult_init(&response.callMethodResult);
+
+    /* Force a timeout */
+    UA_fakeSleep(2500);
+    UA_Server_run_iterate(server, true);
+    UA_Client_run_iterate(client, 0);
+    ck_assert_uint_eq(clientCounter, 1);
+
+    /* Return the late response */
+    UA_Server_setAsyncOperationResult(server, &response, context);
+
+    running = true;
+    THREAD_CREATE(server_thread, serverloop);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+} END_TEST
+
 static Suite* method_async_suite(void) {
     /* set up unit test for internal data structures */
     Suite *s = suite_create("Async Method");
 
-    /* UA_Server_PendingList */
-    TCase* tc_pending = tcase_create("PendingList");
-    tcase_add_checked_fixture(tc_pending, setup, NULL);
-    tcase_add_test(tc_pending, InternalTestingPendingList);
-    suite_add_tcase(s, tc_pending);
-
-    /* UA_AsyncOperationManager */
-    TCase* tc_manager = tcase_create("AsyncMethodManager");
-    tcase_add_checked_fixture(tc_manager, NULL, NULL);
-    tcase_add_test(tc_manager, InternalTestingManager);
+    TCase* tc_manager = tcase_create("AsyncMethod");
+    tcase_add_checked_fixture(tc_manager, setup, teardown);
+    tcase_add_test(tc_manager, Async_call);
+    tcase_add_test(tc_manager, Async_timeout);
+    tcase_add_test(tc_manager, Async_timeout_worker);
     suite_add_tcase(s, tc_manager);
     
-    /* UA_Server_MethodQueues */
-    TCase* tc_queue = tcase_create("AsyncMethodQueue");
-    tcase_add_checked_fixture(tc_queue, NULL, teardown);
-    tcase_add_test(tc_queue, InternalTestingQueue);
-    suite_add_tcase(s, tc_queue);    
-    
     return s;
 }
 
-
 int main(void) {
     /* Unit tests for internal data structures for async methods */
     int number_failed = 0;