Prechádzať zdrojové kódy

refactor(server): Remove server/ua_server_methodqueue.h

Moves the queue elements to the AsyncOperationManager
Julius Pfrommer 4 rokov pred
rodič
commit
35f2ca4116

+ 0 - 1
CMakeLists.txt

@@ -626,7 +626,6 @@ set(internal_headers ${PROJECT_SOURCE_DIR}/deps/open62541_queue.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_server_internal.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_services.h
                      ${PROJECT_SOURCE_DIR}/src/client/ua_client_internal.h
-                     ${PROJECT_SOURCE_DIR}/src/server/ua_server_methodqueue.h
 					 ${PROJECT_SOURCE_DIR}/src/server/ua_asyncoperation_manager.h)
 
 # TODO: make client optional

+ 51 - 5
src/server/ua_asyncoperation_manager.h

@@ -21,9 +21,28 @@
 #include "ua_util_internal.h"
 #include "ua_workqueue.h"
 
+_UA_BEGIN_DECLS
+
 #if UA_MULTITHREADING >= 100
 
-_UA_BEGIN_DECLS
+struct AsyncMethodQueueElement {
+    SIMPLEQ_ENTRY(AsyncMethodQueueElement) next;
+    UA_CallMethodRequest m_Request;
+    UA_CallMethodResult	m_Response;
+    UA_DateTime	m_tDispatchTime;
+    UA_UInt32	m_nRequestId;
+    UA_NodeId	m_nSessionId;
+    UA_UInt32	m_nIndex;
+};
+    
+/* Internal Helper to transfer info */
+struct AsyncMethodContextInternal {
+    UA_UInt32 nRequestId;
+    UA_NodeId nSessionId;
+    UA_UInt32 nIndex;
+    const UA_CallRequest* pRequest;
+    UA_SecureChannel* pChannel;
+};
 
 typedef struct asyncOperationEntry {
     LIST_ENTRY(asyncOperationEntry) pointers;
@@ -41,15 +60,29 @@ typedef struct asyncOperationEntry {
 } asyncOperationEntry;
 
 typedef struct UA_AsyncOperationManager {
+    /* Requests / Responses */
     LIST_HEAD(, asyncOperationEntry) asyncOperations;
     UA_UInt32 currentCount;
+
+    /* Operations belonging to a request */
+    UA_UInt32	nMQCurSize;		/* actual size of queue */
+    UA_UInt64	nCBIdIntegrity;	/* id of callback queue check callback  */
+    UA_UInt64	nCBIdResponse;	/* id of callback check for a response  */
+
+    UA_LOCK_TYPE(ua_request_queue_lock)
+    UA_LOCK_TYPE(ua_response_queue_lock)
+    UA_LOCK_TYPE(ua_pending_list_lock)
+
+    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_request_queue;    
+    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_response_queue;
+    SIMPLEQ_HEAD(, AsyncMethodQueueElement) ua_method_pending_list;
 } UA_AsyncOperationManager;
 
 void
-UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm);
+UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm, UA_Server *server);
 
 /* Deletes all entries */
-void UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm);
+void UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm, UA_Server *server);
 
 UA_StatusCode
 UA_AsyncOperationManager_createEntry(UA_AsyncOperationManager *amm, UA_Server *server,
@@ -69,8 +102,21 @@ UA_AsyncOperationManager_getById(UA_AsyncOperationManager *amm, const UA_UInt32
 void
 UA_AsyncOperationManager_checkTimeouts(UA_Server *server, UA_AsyncOperationManager *amm);
 
-_UA_END_DECLS
+/* Internal definitions for the unit tests */
+void UA_Server_DeleteMethodQueueElement(UA_Server *server, struct AsyncMethodQueueElement *pElem);
+void UA_Server_AddPendingMethodCall(UA_Server* server, struct AsyncMethodQueueElement *pElem);
+void UA_Server_RmvPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem);
+UA_Boolean UA_Server_IsPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem);
+UA_StatusCode UA_Server_SetNextAsyncMethod(UA_Server *server, const UA_UInt32 nRequestId,
+                                           const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
+                                           const UA_CallMethodRequest* pRequest);
+
+void UA_Server_CheckQueueIntegrity(UA_Server *server, void *_);
+struct AsyncMethodQueueElement *
+UA_Server_GetAsyncMethodResult(UA_AsyncOperationManager *amm);
 
-#endif
+#endif /* UA_MULTITHREADING >= 100 */
+
+_UA_END_DECLS
 
 #endif /* UA_ASYNCOPERATION_MANAGER_H_ */

+ 2 - 12
src/server/ua_server.c

@@ -19,10 +19,6 @@
 
 #include "ua_server_internal.h"
 
-#if UA_MULTITHREADING >= 100
-#include "server/ua_server_methodqueue.h"
-#endif
-
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
 #include "ua_pubsub_ns0.h"
 #endif
@@ -198,9 +194,7 @@ void UA_Server_delete(UA_Server *server) {
 #endif
 
 #if UA_MULTITHREADING >= 100
-    UA_Server_removeCallback(server, server->nCBIdResponse);
-    UA_Server_MethodQueues_delete(server);
-    UA_AsyncOperationManager_clear(&server->asyncMethodManager);
+    UA_AsyncOperationManager_clear(&server->asyncMethodManager, server);
 #endif
 
     /* Clean up the Admin Session */
@@ -294,11 +288,7 @@ UA_Server_init(UA_Server *server) {
     UA_SessionManager_init(&server->sessionManager, server);
 
 #if UA_MULTITHREADING >= 100
-    UA_AsyncOperationManager_init(&server->asyncMethodManager);
-    UA_Server_MethodQueues_init(server);
-    /* Add a regular callback for for checking responmses using a 50ms interval. */
-    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CallMethodResponse, NULL,
-                                  50.0, &server->nCBIdResponse);
+    UA_AsyncOperationManager_init(&server->asyncMethodManager, server);
 #endif
 
     /* Add a regular callback for cleanup and maintenance. With a 10s interval. */

+ 251 - 240
src/server/ua_server_async.c

@@ -12,7 +12,6 @@
  */
 
 #include "ua_asyncoperation_manager.h"
-#include "ua_server_methodqueue.h"
 #include "ua_server_internal.h"
 #include "ua_subscription.h"
 
@@ -22,14 +21,149 @@
 /* AsyncOperationManager */
 /*************************/
 
+/* Checks queue element timeouts */
 void
-UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm) {
+UA_Server_CheckQueueIntegrity(UA_Server *server, void *_) {
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+
+    /* For debugging/testing purposes */
+    if(server->config.asyncOperationTimeout <= 0.0) {
+        UA_AsyncOperationManager_checkTimeouts(server, amm);
+        return;
+    }
+
+    /* To prevent a lockup, we remove a maximum 10% of timed out entries */
+    /* on small queues, we do at least 3 */
+    size_t bMaxRemove = server->config.maxAsyncOperationQueueSize / 10;
+    if(bMaxRemove < 3)
+        bMaxRemove = 3;
+    UA_LOCK(amm->ua_request_queue_lock);
+    /* Check ifentry has been in the queue too long time */
+    while(bMaxRemove-- && !SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
+        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
+        UA_DateTime tNow = UA_DateTime_now();
+        UA_DateTime tReq = request_elem->m_tDispatchTime;
+        UA_DateTime diff = tNow - tReq;
+        /* queue entry is not older than server->nMQTimeoutSecs, so we stop checking */
+        if(diff <= (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC))
+            break;
+
+        /* remove it from the queue */
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_CheckQueueIntegrity: Request #%u was removed due to a timeout (%f)",
+                       request_elem->m_nRequestId, server->config.asyncOperationTimeout);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
+        amm->nMQCurSize--;
+        /* Notify that we removed this request - e.g. Bad Call Response
+         * (UA_STATUSCODE_BADREQUESTTIMEOUT) */
+        UA_CallMethodResult* result = &request_elem->m_Response;
+        UA_CallMethodResult_clear(result);
+        result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
+        UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId,
+                                       &request_elem->m_nSessionId,
+                                       request_elem->m_nIndex, result);
+        UA_CallMethodResult_clear(result);
+        UA_Server_DeleteMethodQueueElement(server, request_elem);
+    }
+    UA_UNLOCK(amm->ua_request_queue_lock);
+
+    /* Clear all pending */
+    UA_LOCK(amm->ua_pending_list_lock);
+    /* Check ifentry has been in the pendig list too long time */
+    while(!SIMPLEQ_EMPTY(&amm->ua_method_pending_list)) {
+        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&amm->ua_method_pending_list);
+        UA_DateTime tNow = UA_DateTime_now();
+        UA_DateTime tReq = request_elem->m_tDispatchTime;
+        UA_DateTime diff = tNow - tReq;
+
+        /* list entry is not older than server->nMQTimeoutSecs, so we stop checking */
+        if(diff <= (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC))
+            break;
+            
+        /* Remove it from the list */
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_CheckQueueIntegrity: Pending request #%u was removed "
+                       "due to a timeout (%f)", request_elem->m_nRequestId,
+                       server->config.asyncOperationTimeout);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
+        /* Notify that we removed this request - e.g. Bad Call Response
+         * (UA_STATUSCODE_BADREQUESTTIMEOUT) */
+        UA_CallMethodResult* result = &request_elem->m_Response;
+        UA_CallMethodResult_clear(result);
+        result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
+        UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId,
+                                       &request_elem->m_nSessionId,
+                                       request_elem->m_nIndex, result);
+        UA_CallMethodResult_clear(result);
+        UA_Server_DeleteMethodQueueElement(server, request_elem);
+    }
+    UA_UNLOCK(amm->ua_pending_list_lock);
+
+    /* Now we check ifwe still have pending CallRequests */
+    UA_AsyncOperationManager_checkTimeouts(server, amm);
+}
+
+void
+UA_AsyncOperationManager_init(UA_AsyncOperationManager *amm, UA_Server *server) {
     memset(amm, 0, sizeof(UA_AsyncOperationManager));
     LIST_INIT(&amm->asyncOperations);
+
+    amm->nMQCurSize = 0;
+
+    SIMPLEQ_INIT(&amm->ua_method_request_queue);
+    SIMPLEQ_INIT(&amm->ua_method_response_queue);
+    SIMPLEQ_INIT(&amm->ua_method_pending_list);
+
+    UA_LOCK_INIT(amm->ua_request_queue_lock);
+    UA_LOCK_INIT(amm->ua_response_queue_lock);
+    UA_LOCK_INIT(amm->ua_pending_list_lock);
+
+    /* Add a regular callback for cleanup and maintenance using a 10s interval. */
+    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CheckQueueIntegrity,
+                                  NULL, 10000.0, &amm->nCBIdIntegrity);
+
+    /* Add a regular callback for for checking responmses using a 50ms interval. */
+    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CallMethodResponse,
+                                  NULL, 50.0, &amm->nCBIdResponse);
 }
 
 void
-UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm) {
+UA_AsyncOperationManager_clear(UA_AsyncOperationManager *amm, UA_Server *server) {
+    UA_Server_removeCallback(server, amm->nCBIdResponse);
+    UA_Server_removeCallback(server, amm->nCBIdIntegrity);
+
+    /* Clean up request queue */
+    UA_LOCK(amm->ua_request_queue_lock);
+    while(!SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
+        struct AsyncMethodQueueElement* request = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
+        UA_Server_DeleteMethodQueueElement(server, request);
+    }
+    UA_UNLOCK(amm->ua_request_queue_lock);
+
+    /* Clean up response queue */
+    UA_LOCK(amm->ua_response_queue_lock);
+    while(!SIMPLEQ_EMPTY(&amm->ua_method_response_queue)) {
+        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&amm->ua_method_response_queue);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_response_queue, next);
+        UA_Server_DeleteMethodQueueElement(server, response);
+    }
+    UA_UNLOCK(amm->ua_response_queue_lock);
+
+    /* Clear all pending */
+    UA_LOCK(amm->ua_pending_list_lock);
+    while(!SIMPLEQ_EMPTY(&amm->ua_method_pending_list)) {
+        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&amm->ua_method_pending_list);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
+        UA_Server_DeleteMethodQueueElement(server, response);
+    }
+    UA_UNLOCK(amm->ua_pending_list_lock);
+
+    /* Delete all locks */
+    UA_LOCK_DESTROY(amm->ua_response_queue_lock);
+    UA_LOCK_DESTROY(amm->ua_request_queue_lock);
+    UA_LOCK_DESTROY(amm->ua_pending_list_lock);
+
     asyncOperationEntry *current, *temp;
     LIST_FOREACH_SAFE(current, &amm->asyncOperations, pointers, temp) {
         UA_AsyncOperationManager_removeEntry(amm, current);
@@ -165,208 +299,52 @@ UA_AsyncOperationManager_checkTimeouts(UA_Server *server, UA_AsyncOperationManag
 /* MethodQueue */
 /***************/
 
-/* Initialize Request Queue */
-void
-UA_Server_MethodQueues_init(UA_Server *server) {
-    server->nMQCurSize = 0;
-
-    UA_LOCK_INIT(server->ua_request_queue_lock);
-    SIMPLEQ_INIT(&server->ua_method_request_queue);
-
-    UA_LOCK_INIT(server->ua_response_queue_lock);
-    SIMPLEQ_INIT(&server->ua_method_response_queue);
-
-    UA_LOCK_INIT(server->ua_pending_list_lock);
-    SIMPLEQ_INIT(&server->ua_method_pending_list);
-
-    /* Add a regular callback for cleanup and maintenance using a 10s interval. */
-    UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CheckQueueIntegrity,
-        NULL, 10000.0, &server->nCBIdIntegrity);
-}
-
-/* Cleanup and terminate queues */
-void
-UA_Server_MethodQueues_delete(UA_Server *server) {
-    UA_Server_removeCallback(server, server->nCBIdIntegrity);
-
-    /* Clean up request queue */
-    UA_LOCK(server->ua_request_queue_lock);
-    while (!SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
-        struct AsyncMethodQueueElement* request = SIMPLEQ_FIRST(&server->ua_method_request_queue);
-        SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
-        UA_Server_DeleteMethodQueueElement(server, request);
-    }
-    UA_UNLOCK(server->ua_request_queue_lock);
-
-    /* Clean up response queue */
-    UA_LOCK(server->ua_response_queue_lock);
-    while (!SIMPLEQ_EMPTY(&server->ua_method_response_queue)) {
-        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&server->ua_method_response_queue);
-        SIMPLEQ_REMOVE_HEAD(&server->ua_method_response_queue, next);
-        UA_Server_DeleteMethodQueueElement(server, response);
-    }
-    UA_UNLOCK(server->ua_response_queue_lock);
+/* Enqueue next MethodRequest */
+UA_StatusCode
+UA_Server_SetNextAsyncMethod(UA_Server *server, const UA_UInt32 nRequestId,
+                             const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
+                             const UA_CallMethodRequest *pRequest) {
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
 
-    /* Clear all pending */
-    UA_LOCK(server->ua_pending_list_lock);
-    while (!SIMPLEQ_EMPTY(&server->ua_method_pending_list)) {
-        struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&server->ua_method_pending_list);
-        SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
-        UA_Server_DeleteMethodQueueElement(server, response);
+    if(server->config.maxAsyncOperationQueueSize != 0 &&
+        amm->nMQCurSize >= server->config.maxAsyncOperationQueueSize) {
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_SetNextAsyncMethod: Queue exceeds limit (%d).",
+                       (UA_UInt32)server->config.maxAsyncOperationQueueSize);
+        return UA_STATUSCODE_BADUNEXPECTEDERROR;
     }
-    UA_UNLOCK(server->ua_pending_list_lock);
-
-    /* delete all locks */
-    /* TODO KS: actually we should make sure the worker is not 'hanging' on this lock anymore */
-    Sleep(100);
-    UA_LOCK_DESTROY(server->ua_response_queue_lock);
-    UA_LOCK_DESTROY(server->ua_request_queue_lock);
-    UA_LOCK_DESTROY(server->ua_pending_list_lock);
-}
 
-/* Checks queue element timeouts */
-void
-UA_Server_CheckQueueIntegrity(UA_Server *server, void *data) {
-    /* for debugging/testing purposes */
-    if(server->config.asyncOperationTimeout <= 0.0) {
-        UA_AsyncOperationManager_checkTimeouts(server, &server->asyncMethodManager);
-        return;
+    struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)
+        UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
+    if(!elem) {
+        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "UA_Server_SetNextAsyncMethod: Mem alloc failed.");
+        return UA_STATUSCODE_BADOUTOFMEMORY;
     }
 
-    /* To prevent a lockup, we remove a maximum 10% of timed out entries */
-    /* on small queues, we do at least 3 */
-    size_t bMaxRemove = server->config.maxAsyncOperationQueueSize / 10;
-    if(bMaxRemove < 3)
-        bMaxRemove = 3;
-    UA_Boolean bCheckQueue = UA_TRUE;
-    UA_LOCK(server->ua_request_queue_lock);
-    /* Check ifentry has been in the queue too long time */
-    while(bCheckQueue && bMaxRemove-- && !SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
-        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&server->ua_method_request_queue);
-        if(request_elem) {
-            UA_DateTime tNow = UA_DateTime_now();
-            UA_DateTime tReq = request_elem->m_tDispatchTime;
-            UA_DateTime diff = tNow - tReq;
-            if(diff > (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC)) {
-                /* remove it from the queue */
-                UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                    "UA_Server_CheckQueueIntegrity: Request #%u was removed due to a timeout (%f)",
-                    request_elem->m_nRequestId, server->config.asyncOperationTimeout);
-                SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
-                server->nMQCurSize--;
-                /* Notify that we removed this request - e.g. Bad Call Response (UA_STATUSCODE_BADREQUESTTIMEOUT) */
-                UA_CallMethodResult* result = &request_elem->m_Response;
-                UA_CallMethodResult_clear(result);
-                result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
-                UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId, &request_elem->m_nSessionId, request_elem->m_nIndex, result);
-                UA_CallMethodResult_clear(result);
-                UA_Server_DeleteMethodQueueElement(server, request_elem);
-            }
-            else {
-                /* queue entry is not older than server->nMQTimeoutSecs, so we stop checking */
-                bCheckQueue = UA_FALSE;
-            }
-        }
+    UA_StatusCode result = UA_CallMethodRequest_copy(pRequest, &elem->m_Request);
+    if(result != UA_STATUSCODE_GOOD) {
+        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "UA_Server_SetAsyncMethodResult: UA_CallMethodRequest_copy failed.");                
+        UA_free(elem);
+        return result;
     }
-    UA_UNLOCK(server->ua_request_queue_lock);
 
-    /* Clear all pending */
-    bCheckQueue = UA_TRUE;
-    UA_LOCK(server->ua_pending_list_lock);
-    /* Check ifentry has been in the pendig list too long time */
-    while(bCheckQueue && !SIMPLEQ_EMPTY(&server->ua_method_pending_list)) {
-        struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&server->ua_method_pending_list);
-        if(request_elem) {
-            UA_DateTime tNow = UA_DateTime_now();
-            UA_DateTime tReq = request_elem->m_tDispatchTime;
-            UA_DateTime diff = tNow - tReq;
-            if(diff > (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC)) {
-                /* remove it from the list */
-                UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                    "UA_Server_CheckQueueIntegrity: Pending request #%u was removed due to a timeout (%f)",
-                    request_elem->m_nRequestId, server->config.asyncOperationTimeout);
-                SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
-                /* Notify that we removed this request - e.g. Bad Call Response (UA_STATUSCODE_BADREQUESTTIMEOUT) */
-                UA_CallMethodResult* result = &request_elem->m_Response;
-                UA_CallMethodResult_clear(result);
-                result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
-                UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId, &request_elem->m_nSessionId, request_elem->m_nIndex, result);
-                UA_CallMethodResult_clear(result);
-                UA_Server_DeleteMethodQueueElement(server, request_elem);
-            }
-            else {
-                /* list entry is not older than server->nMQTimeoutSecs, so we stop checking */
-                bCheckQueue = UA_FALSE;
-            }
-        }
-    }
-    UA_UNLOCK(server->ua_pending_list_lock);
+    UA_CallMethodResult_init(&elem->m_Response);
+    elem->m_nRequestId = nRequestId;
+    elem->m_nSessionId = *nSessionId;
+    elem->m_nIndex = nIndex;
+    elem->m_tDispatchTime = UA_DateTime_now();
 
-    /* Now we check ifwe still have pending CallRequests */
-    UA_AsyncOperationManager_checkTimeouts(server, &server->asyncMethodManager);
-}
+    UA_LOCK(amm->ua_request_queue_lock);
+    SIMPLEQ_INSERT_TAIL(&amm->ua_method_request_queue, elem, next);
+    amm->nMQCurSize++;
+    UA_UNLOCK(amm->ua_request_queue_lock);
 
-/* Enqueue next MethodRequest */
-UA_StatusCode
-UA_Server_SetNextAsyncMethod(UA_Server *server,
-    const UA_UInt32 nRequestId,
-    const UA_NodeId *nSessionId,
-    const UA_UInt32 nIndex,
-    const UA_CallMethodRequest *pRequest) {
-    UA_StatusCode result = UA_STATUSCODE_GOOD;
-
-    if(server->config.maxAsyncOperationQueueSize == 0 ||
-        server->nMQCurSize < server->config.maxAsyncOperationQueueSize) {
-        struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)
-            UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
-        if(elem) {
-            UA_CallMethodRequest_init(&elem->m_Request);
-            result = UA_CallMethodRequest_copy(pRequest, &elem->m_Request);
-            if(result != UA_STATUSCODE_GOOD) {
-                UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                    "UA_Server_SetAsyncMethodResult: UA_CallMethodRequest_copy failed.");                
-            }
-
-            elem->m_nRequestId = nRequestId;
-            elem->m_nSessionId = *nSessionId;
-            elem->m_nIndex = nIndex;
-            UA_CallMethodResult_clear(&elem->m_Response);
-            elem->m_tDispatchTime = UA_DateTime_now();
-            UA_LOCK(server->ua_request_queue_lock);
-            SIMPLEQ_INSERT_TAIL(&server->ua_method_request_queue, elem, next);
-            server->nMQCurSize++;
-            if(server->config.asyncOperationNotifyCallback)
-                server->config.asyncOperationNotifyCallback(server);
-            UA_UNLOCK(server->ua_request_queue_lock);
-        } else {
-            /* notify about error */
-            UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                "UA_Server_SetNextAsyncMethod: Mem alloc failed.");
-            result = UA_STATUSCODE_BADOUTOFMEMORY;
-        }
-    } else {
-        /* issue warning */
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-            "UA_Server_SetNextAsyncMethod: Queue exceeds limit (%d).",
-                       (UA_UInt32)server->config.maxAsyncOperationQueueSize);
-        result = UA_STATUSCODE_BADUNEXPECTEDERROR;
-    }
-    return result;
-}
+    if(server->config.asyncOperationNotifyCallback)
+        server->config.asyncOperationNotifyCallback(server);
 
-/* Private API */
-/* Get next Method Call Response */
-UA_Boolean
-UA_Server_GetAsyncMethodResult(UA_Server *server, struct AsyncMethodQueueElement **pResponse) {
-    UA_Boolean bRV = UA_FALSE;    
-    UA_LOCK(server->ua_response_queue_lock);
-    if(!SIMPLEQ_EMPTY(&server->ua_method_response_queue)) {
-        *pResponse = SIMPLEQ_FIRST(&server->ua_method_response_queue);
-        SIMPLEQ_REMOVE_HEAD(&server->ua_method_response_queue, next);
-        bRV = UA_TRUE;
-    }
-    UA_UNLOCK(server->ua_response_queue_lock);
-    return bRV;
+    return UA_STATUSCODE_GOOD;
 }
 
 /* Deep delete queue Element - only memory we did allocate */
@@ -378,45 +356,53 @@ UA_Server_DeleteMethodQueueElement(UA_Server *server, struct AsyncMethodQueueEle
 }
 
 void UA_Server_AddPendingMethodCall(UA_Server* server, struct AsyncMethodQueueElement *pElem) {
-    UA_LOCK(server->ua_pending_list_lock);
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+
+    UA_LOCK(amm->ua_pending_list_lock);
     pElem->m_tDispatchTime = UA_DateTime_now(); /* reset timestamp for timeout */
-    SIMPLEQ_INSERT_TAIL(&server->ua_method_pending_list, pElem, next);
-    UA_UNLOCK(server->ua_pending_list_lock);
+    SIMPLEQ_INSERT_TAIL(&amm->ua_method_pending_list, pElem, next);
+    UA_UNLOCK(amm->ua_pending_list_lock);
 }
 
-void UA_Server_RmvPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
-/* Remove element from pending list */
-/* Do NOT delete it because we still need it */
+void
+UA_Server_RmvPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+
+    /* Remove element from pending list */
+    /* Do NOT delete it because we still need it */
     struct AsyncMethodQueueElement* current = NULL;
     struct AsyncMethodQueueElement* tmp_iter = NULL;
     struct AsyncMethodQueueElement* previous = NULL;
-    UA_LOCK(server->ua_pending_list_lock);
-    SIMPLEQ_FOREACH_SAFE(current, &server->ua_method_pending_list, next, tmp_iter) {
+    UA_LOCK(amm->ua_pending_list_lock);
+    SIMPLEQ_FOREACH_SAFE(current, &amm->ua_method_pending_list, next, tmp_iter) {
         if(pElem == current) {
             if(previous == NULL)
-                SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
+                SIMPLEQ_REMOVE_HEAD(&amm->ua_method_pending_list, next);
             else
-                SIMPLEQ_REMOVE_AFTER(&server->ua_method_pending_list, previous, next);
+                SIMPLEQ_REMOVE_AFTER(&amm->ua_method_pending_list, previous, next);
             break;
         }
         previous = current;
     }
-    UA_UNLOCK(server->ua_pending_list_lock);
+    UA_UNLOCK(amm->ua_pending_list_lock);
     return;
 }
 
-UA_Boolean UA_Server_IsPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
+UA_Boolean
+UA_Server_IsPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+
     UA_Boolean bRV = UA_FALSE;
     struct AsyncMethodQueueElement* current = NULL;
     struct AsyncMethodQueueElement* tmp_iter = NULL;
-    UA_LOCK(server->ua_pending_list_lock);
-    SIMPLEQ_FOREACH_SAFE(current, &server->ua_method_pending_list, next, tmp_iter) {
+    UA_LOCK(amm->ua_pending_list_lock);
+    SIMPLEQ_FOREACH_SAFE(current, &amm->ua_method_pending_list, next, tmp_iter) {
         if(pElem == current) {
             bRV = UA_TRUE;
             break;
         }
     }
-    UA_UNLOCK(server->ua_pending_list_lock);
+    UA_UNLOCK(amm->ua_pending_list_lock);
     return bRV;
 }
 
@@ -428,14 +414,16 @@ UA_Boolean
 UA_Server_getAsyncOperation(UA_Server *server, UA_AsyncOperationType *type,
                             const UA_AsyncOperationRequest **request,
                             void **context) {
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+
     UA_Boolean bRV = UA_FALSE;
     *type = UA_ASYNCOPERATIONTYPE_INVALID;
     struct AsyncMethodQueueElement *elem = NULL;
-    UA_LOCK(server->ua_request_queue_lock);
-    if(!SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
-        elem = SIMPLEQ_FIRST(&server->ua_method_request_queue);
-        SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
-        server->nMQCurSize--;
+    UA_LOCK(amm->ua_request_queue_lock);
+    if(!SIMPLEQ_EMPTY(&amm->ua_method_request_queue)) {
+        elem = SIMPLEQ_FIRST(&amm->ua_method_request_queue);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_request_queue, next);
+        amm->nMQCurSize--;
         if(elem) {
             *request = (UA_AsyncOperationRequest*)&elem->m_Request;
             *context = (void*)elem;            
@@ -443,10 +431,11 @@ UA_Server_getAsyncOperation(UA_Server *server, UA_AsyncOperationType *type,
         }
         else {
             UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                "UA_Server_GetNextAsyncMethod: elem is a NULL-Pointer.");
+                "UA_amm_GetNextAsyncMethod: elem is a NULL-Pointer.");
         }
     }
-    UA_UNLOCK(server->ua_request_queue_lock);
+    UA_UNLOCK(amm->ua_request_queue_lock);
+
     if(bRV && elem) {
         *type = UA_ASYNCOPERATIONTYPE_CALL;
         UA_Server_AddPendingMethodCall(server, elem);
@@ -465,25 +454,29 @@ UA_Server_setAsyncOperationResult(UA_Server *server,
         /* Dismiss response */
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
             "UA_Server_SetAsyncMethodResult: elem is a NULL-Pointer or not valid anymore.");
-    } else {
-        /* UA_Server_RmvPendingMethodCall MUST be called outside the lock
-        * otherwise we can run into a deadlock */
-        UA_Server_RmvPendingMethodCall(server, elem);
-
-        UA_StatusCode result = UA_CallMethodResult_copy(&response->callMethodResult,
-                                                        &elem->m_Response);
-        if(result != UA_STATUSCODE_GOOD) {
-            UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
-            /* Add failed CallMethodResult to response queue */
-            UA_CallMethodResult_clear(&elem->m_Response);
-            elem->m_Response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
-        }
-        /* Insert response in queue */
-        UA_LOCK(server->ua_response_queue_lock);
-        SIMPLEQ_INSERT_TAIL(&server->ua_method_response_queue, elem, next);
-        UA_UNLOCK(server->ua_response_queue_lock);
+        return;
+    }
+    
+    /* UA_Server_RmvPendingMethodCall MUST be called outside the lock
+     * otherwise we can run into a deadlock */
+    UA_Server_RmvPendingMethodCall(server, elem);
+
+
+    UA_StatusCode result = UA_CallMethodResult_copy(&response->callMethodResult,
+                                                    &elem->m_Response);
+    if(result != UA_STATUSCODE_GOOD) {
+        UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
+        /* Add failed CallMethodResult to response queue */
+        UA_CallMethodResult_clear(&elem->m_Response);
+        elem->m_Response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
     }
+
+    /* Insert response in queue */
+    UA_AsyncOperationManager *amm = &server->asyncMethodManager;
+    UA_LOCK(amm->ua_response_queue_lock);
+    SIMPLEQ_INSERT_TAIL(&amm->ua_method_response_queue, elem, next);
+    UA_UNLOCK(amm->ua_response_queue_lock);
 }
 
 /******************/
@@ -518,7 +511,8 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
     UA_Session* session = UA_SessionManager_getSessionById(&server->sessionManager, &data->sessionId);
     UA_UNLOCK(server->serviceMutex);
     if(!session) {
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "UA_Server_InsertMethodResponse: Session is gone");
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_InsertMethodResponse: Session is gone");
         UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
         return;
     }
@@ -526,7 +520,8 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
     /* Check the channel */
     UA_SecureChannel* channel = session->header.channel;
     if(!channel) {
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "UA_Server_InsertMethodResponse: Channel is gone");
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "UA_Server_InsertMethodResponse: Channel is gone");
         UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
         return;
     }
@@ -541,15 +536,31 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
     UA_AsyncOperationManager_removeEntry(&server->asyncMethodManager, data);
 }
 
+/* Get next Method Call Response, user has to call
+ * 'UA_DeleteMethodQueueElement(...)' to cleanup memory */
+struct AsyncMethodQueueElement *
+UA_Server_GetAsyncMethodResult(UA_AsyncOperationManager *amm) {
+     struct AsyncMethodQueueElement *elem = NULL;
+    UA_LOCK(amm->ua_response_queue_lock);
+    if(!SIMPLEQ_EMPTY(&amm->ua_method_response_queue)) {
+        elem = SIMPLEQ_FIRST(&amm->ua_method_response_queue);
+        SIMPLEQ_REMOVE_HEAD(&amm->ua_method_response_queue, next);
+    }
+    UA_UNLOCK(amm->ua_response_queue_lock);
+    return elem;
+}
+
 void
 UA_Server_CallMethodResponse(UA_Server *server, void* data) {
     /* Server fetches Result from queue */
     struct AsyncMethodQueueElement* pResponseServer = NULL;
-    while(UA_Server_GetAsyncMethodResult(server, &pResponseServer)) {
+    while((pResponseServer = UA_Server_GetAsyncMethodResult(&server->asyncMethodManager))) {
         UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_Server_CallMethodResponse: Got Response: OKAY");
-        UA_Server_InsertMethodResponse(server, pResponseServer->m_nRequestId, &pResponseServer->m_nSessionId,
-                                       pResponseServer->m_nIndex, &pResponseServer->m_Response);
+        UA_Server_InsertMethodResponse(server, pResponseServer->m_nRequestId,
+                                       &pResponseServer->m_nSessionId,
+                                       pResponseServer->m_nIndex,
+                                       &pResponseServer->m_Response);
         UA_Server_DeleteMethodQueueElement(server, pResponseServer);
     }
 }

+ 5 - 39
src/server/ua_server_internal.h

@@ -62,28 +62,6 @@ typedef enum {
     UA_SERVERLIFECYLE_RUNNING
 } UA_ServerLifecycle;
 
-#if UA_MULTITHREADING >= 100
-struct AsyncMethodQueueElement {
-        UA_CallMethodRequest m_Request;
-        UA_CallMethodResult	m_Response;
-        UA_DateTime	m_tDispatchTime;
-        UA_UInt32	m_nRequestId;
-        UA_NodeId	m_nSessionId;
-        UA_UInt32	m_nIndex;
-
-        SIMPLEQ_ENTRY(AsyncMethodQueueElement) next;
-    };
-	
-/* Internal Helper to transfer info */
-    struct AsyncMethodContextInternal {
-        UA_UInt32 nRequestId;
-        UA_NodeId nSessionId;
-        UA_UInt32 nIndex;
-        const UA_CallRequest* pRequest;
-        UA_SecureChannel* pChannel;
-    };
-#endif	
-	
 struct UA_Server {
     /* Config */
     UA_ServerConfig config;
@@ -141,24 +119,10 @@ struct UA_Server {
     UA_PubSubManager pubSubManager;
 #endif
 
-
 #if UA_MULTITHREADING >= 100
     UA_LOCK_TYPE(networkMutex)
     UA_LOCK_TYPE(serviceMutex)
-
-	/* Async Method Handling */
-    UA_UInt32	nMQCurSize;		/* actual size of queue */
-    UA_UInt64	nCBIdIntegrity;	/* id of callback queue check callback  */
-    UA_UInt64	nCBIdResponse;	/* id of callback check for a response  */
-
-    UA_LOCK_TYPE(ua_request_queue_lock)
-    UA_LOCK_TYPE(ua_response_queue_lock)
-    UA_LOCK_TYPE(ua_pending_list_lock)
-
-    SIMPLEQ_HEAD(ua_method_request_queue, AsyncMethodQueueElement) ua_method_request_queue;    
-    SIMPLEQ_HEAD(ua_method_response_queue, AsyncMethodQueueElement) ua_method_response_queue;
-    SIMPLEQ_HEAD(ua_method_pending_list, AsyncMethodQueueElement) ua_method_pending_list;
-#endif /* UA_MULTITHREADING >= 100 */
+#endif
 };
 
 /*****************/
@@ -232,12 +196,14 @@ writeWithSession(UA_Server *server, UA_Session *session,
                  const UA_WriteValue *value);
 
 #if UA_MULTITHREADING >= 100
+
 void
 UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
                                const UA_NodeId* nSessionId, const UA_UInt32 nIndex,
                                const UA_CallMethodResult* response);
-void 
-    UA_Server_CallMethodResponse(UA_Server *server, void* data);
+void
+UA_Server_CallMethodResponse(UA_Server *server, void* data);
+
 #endif
 
 UA_StatusCode

+ 0 - 93
src/server/ua_server_methodqueue.h

@@ -1,93 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
-* License, v. 2.0. If a copy of the MPL was not distributed with this
-* file, You can obtain one at http://mozilla.org/MPL/2.0/.
-*
-*    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
-*/
-
-#ifndef UA_SERVER_METHODQUEUE_H_
-#define	UA_SERVER_METHODQUEUE_H_
-
-#include <open62541/types_generated.h>
-#include "open62541_queue.h"
-#include "ua_server_internal.h"
-
-#if UA_MULTITHREADING >= 100
-
-_UA_BEGIN_DECLS
-
-/* Public API, see server.h  */
-
-/* Private API */
-/* Initialize Request Queue
-*
-* @param server The server object */
-void UA_Server_MethodQueues_init(UA_Server *server);
-
-/* Cleanup and terminate queues
-*
-* @param server The server object */
-void UA_Server_MethodQueues_delete(UA_Server *server);
-
-/* Checks queue element timeouts
-*
-* @param server The server object */
-void UA_Server_CheckQueueIntegrity(UA_Server *server, void *data);
-
-/* Deep delete of queue element (except UA_CallMethodRequest which is managed by the caller)
-*
-* @param server The server object
-* @param pElem Pointer to queue element */
-void UA_Server_DeleteMethodQueueElement(UA_Server *server, struct AsyncMethodQueueElement *pElem);
-
-
-/* Internal functions, declared here to be accessible by unit test(s) */
-
-/* Add element to pending list 
-*
-* @param server The server object
-* @param pElem Pointer to queue element */
-void UA_Server_AddPendingMethodCall(UA_Server* server, struct AsyncMethodQueueElement *pElem);
-
-/* Remove element from pending list
-*
-* @param server The server object
-* @param pElem Pointer to queue element */
-void UA_Server_RmvPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem);
-
-/* check if element is in pending list
-*
-* @param server The server object
-* @param pElem Pointer to queue element
-* @return UA_TRUE if element was found, UA_FALSE else */
-UA_Boolean UA_Server_IsPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem);
-
-
-/* Enqueue next MethodRequest
-*
-* @param server The server object
-* @param Pointer to UA_NodeId (pSessionIdentifier)
-* @param UA_UInt32 (unique requestId/server)
-* @param Index of request within UA_Callrequest
-* @param Pointer to UA_CallMethodRequest
-* @return UA_STATUSCODE_GOOD if request was enqueue,
-    UA_STATUSCODE_BADUNEXPECTEDERROR if queue full
-*	UA_STATUSCODE_BADOUTOFMEMORY if no memory available */
-UA_StatusCode UA_Server_SetNextAsyncMethod(UA_Server *server,
-    const UA_UInt32 nRequestId,
-    const UA_NodeId *nSessionId,
-    const UA_UInt32 nIndex,
-    const UA_CallMethodRequest* pRequest);
-
-/* Get next Method Call Response, user has to call 'UA_DeleteMethodQueueElement(...)' to cleanup memory
-*
-* @param server The server object
-* @param Receives pointer to AsyncMethodQueueElement
-* @return UA_FALSE if queue is empty, UA_TRUE else */
-UA_Boolean UA_Server_GetAsyncMethodResult(UA_Server *server, struct AsyncMethodQueueElement **pResponse);
-
-_UA_END_DECLS
-
-#endif /* !UA_MULTITHREADING >= 100 */
-
-#endif /* !UA_SERVER_METHODQUEUE_H_ */

+ 0 - 2
src/server/ua_services_method.c

@@ -17,8 +17,6 @@
 
 #ifdef UA_ENABLE_METHODCALLS /* conditional compilation */
 
-#include "ua_server_methodqueue.h"
-
 static const UA_VariableNode *
 getArgumentsVariableNode(UA_Server *server, const UA_MethodNode *ofMethod,
                          UA_String withBrowseName) {

+ 9 - 15
tests/server/check_server_asyncop.c

@@ -8,7 +8,6 @@
 
 #include "server/ua_services.h"
 #include "ua_server_internal.h"
-#include "ua_server_methodqueue.h"
 
 #include <check.h>
 #include <time.h>
@@ -22,7 +21,6 @@
 static UA_Server* globalServer;
 static UA_Session session;
 
-
 START_TEST(InternalTestingQueue) {
     globalServer->config.asyncOperationTimeout = 2;
     globalServer->config.maxAsyncOperationQueueSize = 5;
@@ -41,14 +39,14 @@ START_TEST(InternalTestingQueue) {
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: create request queue entry");
     UA_StatusCode result = UA_Server_SetNextAsyncMethod(globalServer, reqId++, &id, 0, pRequest1);
     ck_assert_int_eq(result, UA_STATUSCODE_GOOD);
-    ck_assert_int_eq(globalServer->nMQCurSize, 1);
+    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 1);
 
     const UA_AsyncOperationRequest* pRequestWorker = NULL;
     void *pContextWorker = NULL;
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: fetch from queue");
     rv = UA_Server_getAsyncOperation(globalServer, &type, &pRequestWorker, &pContextWorker);
     ck_assert_int_eq(rv, UA_TRUE);
-    ck_assert_int_eq(globalServer->nMQCurSize, 0);
+    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 0);
         
     UA_AsyncOperationResponse pResponse;
     UA_CallMethodResult_init(&pResponse.callMethodResult);
@@ -57,9 +55,9 @@ START_TEST(InternalTestingQueue) {
     UA_CallMethodResult_deleteMembers(&pResponse.callMethodResult);
 
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: fetch result from response queue");
-    struct AsyncMethodQueueElement* pResponseServer = NULL;
-    rv = UA_Server_GetAsyncMethodResult(globalServer, &pResponseServer);
-    ck_assert_int_eq(rv, UA_TRUE);
+    struct AsyncMethodQueueElement* pResponseServer =
+        UA_Server_GetAsyncMethodResult(&globalServer->asyncMethodManager);
+    ck_assert_ptr_ne(pResponseServer, NULL);
     UA_Server_DeleteMethodQueueElement(globalServer, pResponseServer);
     
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: testing queue limit (%zu)", globalServer->config.maxAsyncOperationQueueSize);
@@ -74,18 +72,18 @@ START_TEST(InternalTestingQueue) {
     }
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: queue should not be empty");
     UA_Server_CheckQueueIntegrity(globalServer,NULL);
-    ck_assert_int_ne(globalServer->nMQCurSize, 0);
+    ck_assert_int_ne(globalServer->asyncMethodManager.nMQCurSize, 0);
     UA_fakeSleep((UA_Int32)(globalServer->config.asyncOperationTimeout + 1) * 1000);
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: empty queue caused by queue timeout (%ds)", (UA_Int32)globalServer->config.asyncOperationTimeout);
     /* has to be done twice due to internal queue delete limit */
     UA_Server_CheckQueueIntegrity(globalServer,NULL);
     UA_Server_CheckQueueIntegrity(globalServer,NULL);
-    ck_assert_int_eq(globalServer->nMQCurSize, 0);    
+    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 0);    
     
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking queue: adding one new entry to empty queue");
     result = UA_Server_SetNextAsyncMethod(globalServer, reqId++, &id, 0, pRequest1);
     ck_assert_int_eq(result, UA_STATUSCODE_GOOD);    
-    ck_assert_int_eq(globalServer->nMQCurSize, 1);
+    ck_assert_int_eq(globalServer->asyncMethodManager.nMQCurSize, 1);
     UA_CallMethodRequest_delete(pRequest1);
 }
 END_TEST
@@ -148,14 +146,10 @@ START_TEST(InternalTestingPendingList) {
     UA_Server_AddPendingMethodCall(globalServer, elem3);
     UA_fakeSleep((UA_Int32)(globalServer->config.asyncOperationTimeout + 1) * 1000);
     UA_Server_CheckQueueIntegrity(globalServer, NULL);
-    ck_assert_ptr_eq(globalServer->ua_method_pending_list.sqh_first,NULL);    
+    ck_assert_ptr_eq(globalServer->asyncMethodManager.ua_method_pending_list.sqh_first,NULL);    
 
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking PendingList: global removal/delete");
     UA_Server_AddPendingMethodCall(globalServer, elem1);
-    /* remove all entries and delete queues */
-    UA_Server_MethodQueues_delete(globalServer);
-    /* re-init queues */    
-    UA_Server_MethodQueues_init(globalServer);
 }
 END_TEST