Browse Source

refactor(server): Prepare AsyncMethodManager for read/write operations

Julius Pfrommer 4 years ago
parent
commit
6a91f9d46f

+ 23 - 22
src/server/ua_asyncmethod_manager.c

@@ -20,22 +20,22 @@
 void
 UA_AsyncMethodManager_init(UA_AsyncMethodManager *amm) {
     memset(amm, 0, sizeof(UA_AsyncMethodManager));
-    LIST_INIT(&amm->asyncmethods);
+    LIST_INIT(&amm->asyncOperations);
 }
 
 void
 UA_AsyncMethodManager_clear(UA_AsyncMethodManager *amm) {
-    asyncmethod_list_entry *current, *temp;
-    LIST_FOREACH_SAFE(current, &amm->asyncmethods, pointers, temp) {
+    asyncOperationEntry *current, *temp;
+    LIST_FOREACH_SAFE(current, &amm->asyncOperations, pointers, temp) {
         UA_AsyncMethodManager_removeEntry(amm, current);
     }
 }
 
-asyncmethod_list_entry *
+asyncOperationEntry *
 UA_AsyncMethodManager_getById(UA_AsyncMethodManager *amm, const UA_UInt32 requestId,
                               const UA_NodeId *sessionId) {
-    asyncmethod_list_entry *current = NULL;
-    LIST_FOREACH(current, &amm->asyncmethods, pointers) {
+    asyncOperationEntry *current = NULL;
+    LIST_FOREACH(current, &amm->asyncOperations, pointers) {
         if(current->requestId == requestId &&
            UA_NodeId_equal(&current->sessionId, sessionId))
             return current;
@@ -47,9 +47,10 @@ UA_StatusCode
 UA_AsyncMethodManager_createEntry(UA_AsyncMethodManager *amm, UA_Server *server,
                                   const UA_NodeId *sessionId, const UA_UInt32 channelId,
                                   const UA_UInt32 requestId, const UA_UInt32 requestHandle,
-                                  const UA_DataType *responseType, const UA_UInt32 nCountdown) {
-    asyncmethod_list_entry *newentry = (asyncmethod_list_entry*)
-        UA_calloc(1, sizeof(asyncmethod_list_entry));
+                                  const UA_AsyncOperationType operationType,
+                                  const UA_UInt32 nCountdown) {
+    asyncOperationEntry *newentry = (asyncOperationEntry*)
+        UA_calloc(1, sizeof(asyncOperationEntry));
     if(!newentry) {
         UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_AsyncMethodManager_createEntry: Mem alloc failed.");
@@ -67,14 +68,13 @@ UA_AsyncMethodManager_createEntry(UA_AsyncMethodManager *amm, UA_Server *server,
     UA_atomic_addUInt32(&amm->currentCount, 1);
     newentry->requestId = requestId;
     newentry->requestHandle = requestHandle;
-    newentry->responseType = responseType;
     newentry->nCountdown = nCountdown;
     newentry->dispatchTime = UA_DateTime_now();
-    UA_CallResponse_init(&newentry->response);
-    newentry->response.results = (UA_CallMethodResult*)
+    UA_CallResponse_init(&newentry->response.callResponse);
+    newentry->response.callResponse.results = (UA_CallMethodResult*)
         UA_calloc(nCountdown, sizeof(UA_CallMethodResult));
-    newentry->response.resultsSize = nCountdown;
-    if(newentry->response.results == NULL) {
+    newentry->response.callResponse.resultsSize = nCountdown;
+    if(newentry->response.callResponse.results == NULL) {
         UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_AsyncMethodManager_createEntry: Mem alloc failed.");
         UA_free(newentry);
@@ -84,9 +84,9 @@ UA_AsyncMethodManager_createEntry(UA_AsyncMethodManager *amm, UA_Server *server,
     /* Set the StatusCode to timeout by default. Will be overwritten when the
      * result is set. */
     for(size_t i = 0; i < nCountdown; i++)
-        newentry->response.results[i].statusCode = UA_STATUSCODE_BADTIMEOUT;
+        newentry->response.callResponse.results[i].statusCode = UA_STATUSCODE_BADTIMEOUT;
 
-    LIST_INSERT_HEAD(&amm->asyncmethods, newentry, pointers);
+    LIST_INSERT_HEAD(&amm->asyncOperations, newentry, pointers);
 
     UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
                  "UA_AsyncMethodManager_createEntry: Chan: %u. Req# %u", channelId, requestId);
@@ -97,11 +97,11 @@ UA_AsyncMethodManager_createEntry(UA_AsyncMethodManager *amm, UA_Server *server,
 /* Remove entry and free all allocated data */
 void
 UA_AsyncMethodManager_removeEntry(UA_AsyncMethodManager *amm,
-                                  asyncmethod_list_entry *current) {
+                                  asyncOperationEntry *current) {
     UA_assert(current);
     LIST_REMOVE(current, pointers);
     UA_atomic_subUInt32(&amm->currentCount, 1);
-    UA_CallResponse_clear(&current->response);
+    UA_CallResponse_clear(&current->response.callResponse);
     UA_NodeId_clear(&current->sessionId);
     UA_free(current);
 }
@@ -109,9 +109,9 @@ UA_AsyncMethodManager_removeEntry(UA_AsyncMethodManager *amm,
 /* Check if CallRequest is waiting way too long (120s) */
 void
 UA_AsyncMethodManager_checkTimeouts(UA_Server *server, UA_AsyncMethodManager *amm) {
-    asyncmethod_list_entry* current = NULL;
-    asyncmethod_list_entry* current_tmp = NULL;
-    LIST_FOREACH_SAFE(current, &amm->asyncmethods, pointers, current_tmp) {
+    asyncOperationEntry* current = NULL;
+    asyncOperationEntry* current_tmp = NULL;
+    LIST_FOREACH_SAFE(current, &amm->asyncOperations, pointers, current_tmp) {
         UA_DateTime tNow = UA_DateTime_now();
         UA_DateTime tReq = current->dispatchTime;
         UA_DateTime diff = tNow - tReq;
@@ -147,7 +147,8 @@ UA_AsyncMethodManager_checkTimeouts(UA_Server *server, UA_AsyncMethodManager *am
 
         /* Okay, here we go, send the UA_CallResponse */
         sendResponse(session->header.channel, current->requestId, current->requestHandle,
-                     (UA_ResponseHeader*)&current->response.responseHeader, current->responseType);
+                     (UA_ResponseHeader*)&current->response.callResponse.responseHeader,
+                     &UA_TYPES[UA_TYPES_CALLRESPONSE]);
         UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "UA_Server_SendResponse: Response for Req# %u sent", current->requestId);
     remove:

+ 16 - 10
src/server/ua_asyncmethod_manager.h

@@ -25,19 +25,23 @@
 
 _UA_BEGIN_DECLS
 
-typedef struct asyncmethod_list_entry {
-    LIST_ENTRY(asyncmethod_list_entry) pointers;
+typedef struct asyncOperationEntry {
+    LIST_ENTRY(asyncOperationEntry) pointers;
     UA_UInt32 requestId;
     UA_NodeId sessionId;
     UA_UInt32 requestHandle;
-    const UA_DataType *responseType;
     UA_DateTime	dispatchTime;       /* Creation time */
     UA_UInt32 nCountdown;			/* Counter for open UA_CallResults */
-    UA_CallResponse response;		/* The 'collected' CallResponse for our CallMethodResponse(s) */
-} asyncmethod_list_entry;
+    UA_AsyncOperationType operationType;
+    union {
+        UA_CallResponse callResponse;
+        UA_ReadResponse readResponse;
+        UA_WriteResponse writeResponse;
+    } response;
+} asyncOperationEntry;
 
 typedef struct UA_AsyncMethodManager {
-    LIST_HEAD(asyncmethod_list, asyncmethod_list_entry) asyncmethods; // doubly-linked list of CallRequests/Responses
+    LIST_HEAD(, asyncOperationEntry) asyncOperations;
     UA_UInt32 currentCount;
 } UA_AsyncMethodManager;
 
@@ -51,14 +55,16 @@ UA_StatusCode
 UA_AsyncMethodManager_createEntry(UA_AsyncMethodManager *amm, UA_Server *server,
                                   const UA_NodeId *sessionId, const UA_UInt32 channelId,
                                   const UA_UInt32 requestId, const UA_UInt32 requestHandle,
-                                  const UA_DataType *responseType, const UA_UInt32 nCountdown);
+                                  const UA_AsyncOperationType operationType,
+                                  const UA_UInt32 nCountdown);
 
 /* The pointers amm and current must not be NULL */
 void
-UA_AsyncMethodManager_removeEntry(UA_AsyncMethodManager *amm, asyncmethod_list_entry *current);
+UA_AsyncMethodManager_removeEntry(UA_AsyncMethodManager *amm, asyncOperationEntry *current);
 
-asyncmethod_list_entry*
-UA_AsyncMethodManager_getById(UA_AsyncMethodManager *amm, const UA_UInt32 requestId, const UA_NodeId *sessionId);
+asyncOperationEntry *
+UA_AsyncMethodManager_getById(UA_AsyncMethodManager *amm, const UA_UInt32 requestId,
+                              const UA_NodeId *sessionId);
 
 void
 UA_AsyncMethodManager_checkTimeouts(UA_Server *server, UA_AsyncMethodManager *amm);

+ 4 - 3
src/server/ua_server.c

@@ -510,7 +510,7 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
                                const UA_NodeId *nSessionId, const UA_UInt32 nIndex,
                                const UA_CallMethodResult *response) {
     /* Grab the open Request, so we can continue to construct the response */
-    asyncmethod_list_entry *data =
+    asyncOperationEntry *data =
         UA_AsyncMethodManager_getById(&server->asyncMethodManager, nRequestId, nSessionId);
     if(!data) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
@@ -520,7 +520,7 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
     }
 
     /* Add UA_CallMethodResult to UA_CallResponse */
-    UA_CallResponse* pResponse = &data->response;
+    UA_CallResponse* pResponse = &data->response.callResponse;
     UA_CallMethodResult_copy(response, pResponse->results + nIndex);
 
     /* Reduce the number of open results. Are we done yet with all requests? */
@@ -548,7 +548,8 @@ UA_Server_InsertMethodResponse(UA_Server *server, const UA_UInt32 nRequestId,
 
     /* Okay, here we go, send the UA_CallResponse */
     sendResponse(channel, data->requestId, data->requestHandle,
-                 (UA_ResponseHeader*)&data->response.responseHeader, data->responseType);
+                 (UA_ResponseHeader*)&data->response.callResponse.responseHeader,
+                 &UA_TYPES[UA_TYPES_CALLRESPONSE]);
     UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
                  "UA_Server_SendResponse: Response for Req# %u sent", data->requestId);
     /* Remove this job from the UA_AsyncMethodManager */

+ 2 - 1
src/server/ua_server_binary.c

@@ -556,7 +556,8 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
         responseHeader->serviceResult =
             UA_AsyncMethodManager_createEntry(&server->asyncMethodManager, server,
                                               &session->sessionId, channel->securityToken.channelId,
-                                              requestId, requestHeader->requestHandle, responseType,
+                                              requestId, requestHeader->requestHandle,
+                                              UA_ASYNCOPERATIONTYPE_CALL,
                                               (UA_UInt32)((const UA_CallRequest*)requestHeader)->methodsToCallSize);
         if(responseHeader->serviceResult == UA_STATUSCODE_GOOD)
             Service_CallAsync(server, session, channel, requestId,

+ 1 - 2
tests/server/check_server_asyncop.c

@@ -97,12 +97,11 @@ START_TEST(InternalTestingManager) {
     UA_SecureChannel channel;
     UA_SecureChannel_init(&channel);
     UA_LOG_INFO(&globalServer->config.logger, UA_LOGCATEGORY_SERVER, "* Checking UA_AsyncMethodManager_createEntry: create CallRequests");
-    UA_DataType dataType = UA_TYPES[UA_TYPES_CALLMETHODREQUEST];
     for (UA_Int32 i = 1; i < 7; i++) {
         UA_StatusCode result =
             UA_AsyncMethodManager_createEntry(&globalServer->asyncMethodManager, globalServer,
                                               &session.sessionId, channel.securityToken.channelId,
-                                              i, i, &dataType, 1);
+                                              i, i, UA_ASYNCOPERATIONTYPE_CALL, 1);
         ck_assert_int_eq(result, UA_STATUSCODE_GOOD);
     }
     UA_fakeSleep(121000);