Parcourir la source

SecureChannel: Put all messages/chunks in a tailq for in-order processing

There were problems when a client renewed a SecureChannel when a second
message in the old network-buffer was not yet fully processed.
Julius Pfrommer il y a 5 ans
Parent
commit
fd1061e444

+ 7 - 14
src/client/ua_client.c

@@ -382,7 +382,7 @@ processAsyncResponse(UA_Client *client, UA_UInt32 requestId, const UA_NodeId *re
 /* Processes the received service response. Either with an async callback or by
  * decoding the message and returning it "upwards" in the
  * SyncResponseDescription. */
-static UA_StatusCode
+static void
 processServiceResponse(void *application, UA_SecureChannel *channel,
                        UA_MessageType messageType, UA_UInt32 requestId,
                        const UA_ByteString *message) {
@@ -393,17 +393,9 @@ processServiceResponse(void *application, UA_SecureChannel *channel,
        messageType != UA_MESSAGETYPE_MSG) {
         UA_LOG_TRACE_CHANNEL(rd->client->config.logger, channel,
                              "Invalid message type");
-        return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
+        return;
     }
 
-    /* Has the SecureChannel timed out?
-     * TODO: Solve this for client and server together */
-    if(rd->client->state >= UA_CLIENTSTATE_SECURECHANNEL &&
-       (channel->securityToken.createdAt +
-        (channel->securityToken.revisedLifetime * UA_DATETIME_MSEC))
-       < UA_DateTime_nowMonotonic())
-        return UA_STATUSCODE_BADSECURECHANNELCLOSED;
-
     /* Forward declaration for the goto */
     UA_NodeId expectedNodeId = UA_NODEID_NULL;
 
@@ -464,16 +456,17 @@ finish:
             respHeader->serviceResult = retval;
         }
     }
-    return retval;
 }
 
 /* Forward complete chunks directly to the securechannel */
 static UA_StatusCode
 client_processChunk(void *application, UA_Connection *connection, UA_ByteString *chunk) {
     SyncResponseDescription *rd = (SyncResponseDescription*)application;
-    return UA_SecureChannel_processChunk(&rd->client->channel, chunk,
-                                         processServiceResponse,
-                                         rd);
+    UA_StatusCode retval = UA_SecureChannel_decryptAddChunk(&rd->client->channel, chunk);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+    UA_SecureChannel_processCompleteMessages(&rd->client->channel, rd, processServiceResponse);
+    return UA_SecureChannel_persistIncompleteMessages(&rd->client->channel);
 }
 
 /* Receive and process messages until a synchronous message arrives or the

+ 37 - 22
src/client/ua_client_connect_async.c

@@ -152,15 +152,17 @@ sendHELMessage(UA_Client *client) {
     return client->connectStatus;
 }
 
-static UA_StatusCode
+static void
 processDecodedOPNResponseAsync(void *application, UA_SecureChannel *channel,
                                 UA_MessageType messageType,
                                 UA_UInt32 requestId,
                                 const UA_ByteString *message) {
     /* Does the request id match? */
     UA_Client *client = (UA_Client*)application;
-    if (requestId != client->requestId)
-        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    if(requestId != client->requestId) {
+        UA_Client_disconnect(client);
+        return;
+    }
 
     /* Is the content of the expected type? */
     size_t offset = 0;
@@ -169,11 +171,14 @@ processDecodedOPNResponseAsync(void *application, UA_SecureChannel *channel,
             0, UA_TYPES[UA_TYPES_OPENSECURECHANNELRESPONSE].binaryEncodingId);
     UA_StatusCode retval = UA_NodeId_decodeBinary(message, &offset,
                                                   &responseId);
-    if(retval != UA_STATUSCODE_GOOD)
-        return retval;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_Client_disconnect(client);
+        return;
+    }
     if(!UA_NodeId_equal(&responseId, &expectedId)) {
         UA_NodeId_deleteMembers(&responseId);
-        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+        UA_Client_disconnect(client);
+        return;
     }
     UA_NodeId_deleteMembers (&responseId);
 
@@ -181,8 +186,10 @@ processDecodedOPNResponseAsync(void *application, UA_SecureChannel *channel,
     UA_OpenSecureChannelResponse response;
     retval = UA_OpenSecureChannelResponse_decodeBinary(message, &offset,
                                                        &response);
-    if(retval != UA_STATUSCODE_GOOD)
-        return retval;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_Client_disconnect(client);
+        return;
+    }
 
     /* Response.securityToken.revisedLifetime is UInt32 we need to cast it to
      * DateTime=Int64 we take 75% of lifetime to start renewing as described in
@@ -198,29 +205,37 @@ processDecodedOPNResponseAsync(void *application, UA_SecureChannel *channel,
     client->channel.remoteNonce = response.serverNonce;
     UA_ResponseHeader_deleteMembers(&response.responseHeader); /* the other members were moved */
     if(client->channel.state == UA_SECURECHANNELSTATE_OPEN)
-        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                     "SecureChannel renewed");
+        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL, "SecureChannel renewed");
     else
-        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                     "SecureChannel opened");
+        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL, "SecureChannel opened");
     client->channel.state = UA_SECURECHANNELSTATE_OPEN;
-    return UA_STATUSCODE_GOOD;
+
+    if(client->state < UA_CLIENTSTATE_SECURECHANNEL)
+        setClientState(client, UA_CLIENTSTATE_SECURECHANNEL);
 }
 
-static UA_StatusCode processOPNResponse
-    (void *application, UA_Connection *connection,
-                    UA_ByteString *chunk) {
+static UA_StatusCode
+processOPNResponse(void *application, UA_Connection *connection,
+                   UA_ByteString *chunk) {
     UA_Client *client = (UA_Client*) application;
-    UA_StatusCode retval = UA_SecureChannel_processChunk (
-            &client->channel, chunk, processDecodedOPNResponseAsync, client);
+    UA_StatusCode retval = UA_SecureChannel_decryptAddChunk(&client->channel, chunk);
     client->connectStatus = retval;
     if(retval != UA_STATUSCODE_GOOD) {
+        UA_Client_disconnect(client);
         return retval;
     }
-    setClientState(client, UA_CLIENTSTATE_SECURECHANNEL);
+    UA_SecureChannel_processCompleteMessages(&client->channel, client, processDecodedOPNResponseAsync);
+    
+    if(client->state < UA_CLIENTSTATE_SECURECHANNEL)
+        return UA_STATUSCODE_BADSECURECHANNELCLOSED;
+
+    retval |= UA_SecureChannel_persistIncompleteMessages(&client->channel);
     retval |= UA_SecureChannel_generateNewKeys(&client->channel);
-    if(retval != UA_STATUSCODE_GOOD)
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_Client_disconnect(client);
         return retval;
+    }
+
     /* Following requests and responses */
     UA_UInt32 reqId;
     if(client->endpointsHandshake)
@@ -228,9 +243,9 @@ static UA_StatusCode processOPNResponse
     else
         retval = requestSession (client, &reqId);
 
-    client->connectStatus = retval;
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_Client_disconnect(client);
     return retval;
-
 }
 
 /* OPN messges to renew the channel are sent asynchronous */

+ 27 - 51
src/server/ua_server_binary.c

@@ -601,7 +601,7 @@ processMSG(UA_Server *server, UA_SecureChannel *channel,
 }
 
 /* Takes decoded messages starting at the nodeid of the content type. */
-static UA_StatusCode
+static void
 processSecureChannelMessage(void *application, UA_SecureChannel *channel,
                             UA_MessageType messagetype, UA_UInt32 requestId,
                             const UA_ByteString *message) {
@@ -626,7 +626,12 @@ processSecureChannelMessage(void *application, UA_SecureChannel *channel,
         retval = UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
         break;
     }
-    return retval;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_LOG_INFO_CHANNEL(server->config.logger, channel,
+                            "Processing the message failed with StatusCode %s."
+                            "Closing the channel", UA_StatusCode_name(retval));
+        Service_CloseSecureChannel(server, channel);
+    }
 }
 
 static UA_StatusCode
@@ -709,11 +714,12 @@ processCompleteChunkWithoutChannel(UA_Server *server, UA_Connection *connection,
         if(retval != UA_STATUSCODE_GOOD)
             break;
 
-        retval = UA_SecureChannel_processChunk(connection->channel, message,
-                                               processSecureChannelMessage,
-                                               server);
+        retval = UA_SecureChannel_decryptAddChunk(connection->channel, message);
         if(retval != UA_STATUSCODE_GOOD)
             break;
+
+        UA_SecureChannel_processCompleteMessages(connection->channel, server,
+                                                 processSecureChannelMessage);
         break;
     }
     default:
@@ -727,23 +733,20 @@ processCompleteChunkWithoutChannel(UA_Server *server, UA_Connection *connection,
 }
 
 static UA_StatusCode
-processCompleteChunk(void *const application,
-                     UA_Connection *const connection,
-                     UA_ByteString *const chunk) {
-    UA_Server *const server = (UA_Server*)application;
+processCompleteChunk(void *const application, UA_Connection *connection,
+                     UA_ByteString *chunk) {
+    UA_Server *server = (UA_Server*)application;
 #ifdef UA_DEBUG_DUMP_PKGS_FILE
     UA_debug_dumpCompleteChunk(server, connection, chunk);
 #endif
     if(!connection->channel)
         return processCompleteChunkWithoutChannel(server, connection, chunk);
-    return UA_SecureChannel_processChunk(connection->channel, chunk,
-                                         processSecureChannelMessage,
-                                         server);
+    return UA_SecureChannel_decryptAddChunk(connection->channel, chunk);
 }
 
-static void
-processBinaryMessage(UA_Server *server, UA_Connection *connection,
-                     UA_ByteString *message) {
+void
+UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection,
+                               UA_ByteString *message) {
     UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_NETWORK,
                  "Connection %i | Received a packet.", connection->sockfd);
 #ifdef UA_DEBUG_DUMP_PKGS
@@ -762,48 +765,21 @@ processBinaryMessage(UA_Server *server, UA_Connection *connection,
         error.reason = UA_STRING_NULL;
         UA_Connection_sendError(connection, &error);
         connection->close(connection);
+        return;
     }
-}
-
-#ifndef UA_ENABLE_MULTITHREADING
 
-void
-UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection,
-                               UA_ByteString *message) {
-    processBinaryMessage(server, connection, message);
-}
-
-#else
-
-typedef struct {
-    UA_Connection *connection;
-    UA_ByteString message;
-} ConnectionMessage;
-
-static void
-workerProcessBinaryMessage(UA_Server *server, ConnectionMessage *cm) {
-    processBinaryMessage(server, cm->connection, &cm->message);
-    UA_free(cm);
-}
-
-void
-UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection,
-                               UA_ByteString *message) {
-    /* Allocate the memory for the callback data */
-    ConnectionMessage *cm = (ConnectionMessage*)UA_malloc(sizeof(ConnectionMessage));
-
-    /* If malloc failed, execute immediately */
-    if(!cm) {
-        processBinaryMessage(server, connection, message);
+    if(!connection->channel)
         return;
-    }
 
-    /* Dispatch to the workers */
-    cm->connection = connection;
-    cm->message = *message;
-    UA_Server_workerCallback(server, (UA_ServerCallback)workerProcessBinaryMessage, cm);
+    /* Process complete messages */
+    UA_SecureChannel_processCompleteMessages(connection->channel, server,
+                                             processSecureChannelMessage);
+
+    /* Store unused chunks internally in the SecureChannel */
+    UA_SecureChannel_persistIncompleteMessages(connection->channel);
 }
 
+#ifdef UA_ENABLE_MULTITHREADING
 static void
 deleteConnectionTrampoline(UA_Server *server, void *data) {
     UA_Connection *connection = (UA_Connection*)data;

+ 210 - 120
src/ua_securechannel.c

@@ -79,6 +79,38 @@ UA_SecureChannel_init(UA_SecureChannel *channel,
     return retval;
 }
 
+static void deleteMessage(UA_Message *me) {
+    UA_ChunkPayload *cp;
+    while((cp = SIMPLEQ_FIRST(&me->chunkPayloads))) {
+        if(cp->copied)
+            UA_ByteString_deleteMembers(&cp->bytes);
+        SIMPLEQ_REMOVE_HEAD(&me->chunkPayloads, pointers);
+        UA_free(cp);
+    }
+    UA_free(me);
+}
+
+static void
+deleteLatestMessage(UA_SecureChannel *channel, UA_UInt32 requestId) {
+    UA_Message *me = TAILQ_LAST(&channel->messages, UA_MessageQueue);
+    if(!me)
+        return;
+    if(me->requestId != requestId)
+        return;
+
+    TAILQ_REMOVE(&channel->messages, me, pointers);
+    deleteMessage(me);
+}
+
+void
+UA_SecureChannel_deleteMessages(UA_SecureChannel *channel) {
+    UA_Message *me, *me_tmp;
+    TAILQ_FOREACH_SAFE(me, &channel->messages, pointers, me_tmp) {
+        TAILQ_REMOVE(&channel->messages, me, pointers);
+        deleteMessage(me);
+    }
+}
+
 void
 UA_SecureChannel_deleteMembersCleanup(UA_SecureChannel *channel) {
     /* Delete members */
@@ -107,17 +139,8 @@ UA_SecureChannel_deleteMembersCleanup(UA_SecureChannel *channel) {
         LIST_REMOVE(sh, pointers);
     }
 
-    /* Remove the buffered chunks */
-    struct MessageEntry *me, *temp_me;
-    LIST_FOREACH_SAFE(me, &channel->chunks, pointers, temp_me) {
-        struct ChunkPayload *cp, *temp_cp;
-        SIMPLEQ_FOREACH_SAFE(cp, &me->chunkPayload, pointers, temp_cp) {
-            UA_ByteString_deleteMembers(&cp->bytes);
-            UA_free(cp);
-        }
-        LIST_REMOVE(me, pointers);
-        UA_free(me);
-    }
+    /* Remove the buffered messages */
+    UA_SecureChannel_deleteMessages(channel);
 }
 
 UA_StatusCode
@@ -225,7 +248,7 @@ UA_SecureChannel_generateNewKeys(UA_SecureChannel *channel) {
 UA_SessionHeader *
 UA_SecureChannel_getSession(UA_SecureChannel *channel,
                             const UA_NodeId *authenticationToken) {
-    struct UA_SessionHeader *sh;
+    UA_SessionHeader *sh;
     LIST_FOREACH(sh, &channel->sessions, pointers) {
         if(UA_NodeId_equal(&sh->authenticationToken, authenticationToken))
             break;
@@ -727,101 +750,123 @@ UA_SecureChannel_sendSymmetricMessage(UA_SecureChannel *channel, UA_UInt32 reque
 /* Assemble Complete Message */
 /*****************************/
 
-static void
-UA_SecureChannel_removeChunks(UA_SecureChannel *channel, UA_UInt32 requestId) {
-    struct MessageEntry *me;
-    LIST_FOREACH(me, &channel->chunks, pointers) {
-        if(me->requestId == requestId) {
-            struct ChunkPayload *cp, *temp_cp;
-            SIMPLEQ_FOREACH_SAFE(cp, &me->chunkPayload, pointers, temp_cp) {
-                UA_ByteString_deleteMembers(&cp->bytes);
-                UA_free(cp);
-            }
-            LIST_REMOVE(me, pointers);
-            UA_free(me);
-            return;
-        }
-    }
-}
-
 static UA_StatusCode
-appendChunk(struct MessageEntry *messageEntry, const UA_ByteString *chunkBody) {
-    
-    struct ChunkPayload* cp = (struct ChunkPayload*)UA_malloc(sizeof(struct ChunkPayload));
-    UA_StatusCode retval = UA_ByteString_copy(chunkBody, &cp->bytes);
-    if (retval != UA_STATUSCODE_GOOD)
-        return retval;
-    
-    SIMPLEQ_INSERT_TAIL(&messageEntry->chunkPayload, cp, pointers);
-    messageEntry->chunkPayloadSize += chunkBody->length;
-    return UA_STATUSCODE_GOOD;
-}
-
-static UA_StatusCode
-UA_SecureChannel_appendChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
-                             const UA_ByteString *chunkBody) {
-    struct MessageEntry *me;
-    LIST_FOREACH(me, &channel->chunks, pointers) {
-        if(me->requestId == requestId)
-            break;
+addChunkPayload(UA_SecureChannel *channel, UA_UInt32 requestId,
+                UA_MessageType messageType, UA_ByteString *chunkPayload,
+                UA_Boolean final) {
+    UA_Message *latest = TAILQ_LAST(&channel->messages, UA_MessageQueue);
+    if(latest) {
+        if(latest->requestId != requestId) {
+            /* Start of a new message */
+            if(!latest->final)
+                return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
+            latest = NULL;
+        } else {
+            if(latest->messageType != messageType) /* MessageType mismatch */
+                return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
+            if(latest->final) /* Correct message, but already finalized */
+                return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
+        }
     }
 
-    /* No chunkentry on the channel, create one */
-    if(!me) {
-        me = (struct MessageEntry *)UA_malloc(sizeof(struct MessageEntry));
-        if(!me)
+    /* Create a new message entry */
+    if(!latest) {
+        latest = (UA_Message*)UA_malloc(sizeof(UA_Message));
+        if(!latest)
             return UA_STATUSCODE_BADOUTOFMEMORY;
-        memset(me, 0, sizeof(struct MessageEntry));
-        me->requestId = requestId;
-        SIMPLEQ_INIT(&me->chunkPayload);
-        LIST_INSERT_HEAD(&channel->chunks, me, pointers);
+        memset(latest, 0, sizeof(UA_Message));
+        latest->requestId = requestId;
+        latest->messageType = messageType;
+        SIMPLEQ_INIT(&latest->chunkPayloads);
+        TAILQ_INSERT_TAIL(&channel->messages, latest, pointers);
     }
 
-    return appendChunk(me, chunkBody);
+    /* Test against the connection settings */
+    const UA_ConnectionConfig *config = &channel->connection->localConf;
+    if(config->maxChunkCount > 0 &&
+       config->maxChunkCount <= latest->chunkPayloadsSize)
+        return UA_STATUSCODE_BADRESPONSETOOLARGE;
+    if(config->maxMessageSize > 0 &&
+       config->maxMessageSize < latest->messageSize + chunkPayload->length)
+        return UA_STATUSCODE_BADRESPONSETOOLARGE;
+
+    /* Create a new chunk entry */
+    UA_ChunkPayload* cp = (UA_ChunkPayload*)UA_malloc(sizeof(UA_ChunkPayload));
+    if(!cp)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    cp->bytes = *chunkPayload;
+    cp->copied = false;
+
+    /* Add the chunk */
+    SIMPLEQ_INSERT_TAIL(&latest->chunkPayloads, cp, pointers);
+    latest->chunkPayloadsSize += 1;
+    latest->messageSize += chunkPayload->length;
+    latest->final = final;
+
+    return UA_STATUSCODE_GOOD;
 }
 
 static UA_StatusCode
-UA_SecureChannel_finalizeChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
-                               const UA_ByteString *chunkBody, UA_MessageType messageType,
-                               UA_ProcessMessageCallback callback, void *application) {
-    struct MessageEntry *messageEntry;
-    LIST_FOREACH(messageEntry, &channel->chunks, pointers) {
-        if(messageEntry->requestId == requestId)
-            break;
-    }
-
-    UA_ByteString bytes;
-    if(!messageEntry) {
-        bytes = *chunkBody;
+processMessage(UA_SecureChannel *channel, const UA_Message *message,
+               void *application, UA_ProcessMessageCallback callback) {
+    if(message->chunkPayloadsSize == 1) {
+        /* No need to combine chunks */
+        UA_ChunkPayload *cp = SIMPLEQ_FIRST(&message->chunkPayloads);
+        callback(application, channel, message->messageType, message->requestId, &cp->bytes);
     } else {
-        UA_StatusCode retval = appendChunk(messageEntry, chunkBody);
-        if(retval != UA_STATUSCODE_GOOD)
-            return retval;
-        
-        UA_ByteString_init(&bytes);
-
-        bytes.data = (UA_Byte*) UA_malloc(messageEntry->chunkPayloadSize);
-        if (!bytes.data)
+        /* Allocate memory */
+        UA_ByteString bytes;
+        bytes.data = (UA_Byte*) UA_malloc(message->messageSize);
+        if(!bytes.data) {
+            UA_LOG_WARNING(channel->securityPolicy->logger, UA_LOGCATEGORY_SECURECHANNEL,
+                           "Could not allocate the memory to assemble the message");
             return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        bytes.length = message->messageSize;
 
-        struct ChunkPayload *cp, *temp_cp;
+        /* Assemble the full message */
         size_t curPos = 0;
-        SIMPLEQ_FOREACH_SAFE(cp, &messageEntry->chunkPayload, pointers, temp_cp) {
+        UA_ChunkPayload *cp;
+        SIMPLEQ_FOREACH(cp, &message->chunkPayloads, pointers) {
             memcpy(&bytes.data[curPos], cp->bytes.data, cp->bytes.length);
             curPos += cp->bytes.length;
-            UA_ByteString_deleteMembers(&cp->bytes);
-            UA_free(cp);
         }
 
-        bytes.length = messageEntry->chunkPayloadSize;
-
-        LIST_REMOVE(messageEntry, pointers);
-        UA_free(messageEntry);
+        /* Process the message */
+        callback(application, channel, message->messageType, message->requestId, &bytes);
+        UA_ByteString_deleteMembers(&bytes);
     }
+    return UA_STATUSCODE_GOOD;
+}
 
-    UA_StatusCode retval = callback(application, channel, messageType, requestId, &bytes);
-    if(messageEntry)
-        UA_ByteString_deleteMembers(&bytes);
+UA_StatusCode
+UA_SecureChannel_processCompleteMessages(UA_SecureChannel *channel, void *application,
+                                         UA_ProcessMessageCallback callback) {
+    UA_Message *me, *me_tmp;
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    TAILQ_FOREACH_SAFE(me, &channel->messages, pointers, me_tmp) {
+        /* Stop at the first incomplete message */
+        if(!me->final)
+            break;
+
+        /* Remove the current message before processing */
+        TAILQ_REMOVE(&channel->messages, me, pointers);
+
+        /* Process */
+        retval = processMessage(channel, me, application, callback);
+        if(retval != UA_STATUSCODE_GOOD)
+            break;
+
+        /* Clean up the message */
+        UA_ChunkPayload *cp;
+        while((cp = SIMPLEQ_FIRST(&me->chunkPayloads))) {
+            if(cp->copied)
+                UA_ByteString_deleteMembers(&cp->bytes);
+            SIMPLEQ_REMOVE_HEAD(&me->chunkPayloads, pointers);
+            UA_free(cp);
+        }
+        UA_free(me);
+    }
     return retval;
 }
 
@@ -829,10 +874,12 @@ UA_SecureChannel_finalizeChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
 /* Process a received Chunk */
 /****************************/
 
+/* Sets the payload to a pointer inside the chunk buffer. Returns the requestId
+ * and the sequenceNumber */
 static UA_StatusCode
-decryptChunk(UA_SecureChannel *channel, const UA_SecurityPolicyCryptoModule *cryptoModule,
-             UA_ByteString *chunk, size_t offset, UA_UInt32 *requestId, UA_UInt32 *sequenceNumber,
-             UA_ByteString *payload, UA_MessageType messageType) {
+decryptChunk(const UA_SecureChannel *channel, const UA_SecurityPolicyCryptoModule *cryptoModule,
+             UA_MessageType messageType, const UA_ByteString *chunk, size_t offset,
+             UA_UInt32 *requestId, UA_UInt32 *sequenceNumber, UA_ByteString *payload) {
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     const UA_SecurityPolicy *securityPolicy = channel->securityPolicy;
     size_t chunkSizeAfterDecryption = chunk->length;
@@ -973,11 +1020,10 @@ checkSymHeader(UA_SecureChannel *const channel,
     return UA_STATUSCODE_GOOD;
 }
 
-UA_StatusCode
-UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
-                              UA_ProcessMessageCallback callback,
-                              void *application) {
-    /* Decode message header */
+/* The chunk body begins after the SecureConversationMessageHeader */
+static UA_StatusCode
+decryptAddChunk(UA_SecureChannel *channel, const UA_ByteString *chunk) {
+    /* Decode the MessageHeader */
     size_t offset = 0;
     UA_SecureConversationMessageHeader messageHeader;
     UA_StatusCode retval =
@@ -997,7 +1043,6 @@ UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
     UA_ChunkType chunkType = (UA_ChunkType)
         (messageHeader.messageHeader.messageTypeAndChunkType & UA_BITMASK_CHUNKTYPE);
 
-    /* ERR message (not encrypted) */
     UA_UInt32 requestId = 0;
     UA_UInt32 sequenceNumber = 0;
     UA_ByteString chunkPayload;
@@ -1005,14 +1050,15 @@ UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
     UA_SequenceNumberCallback sequenceNumberCallback = NULL;
 
     switch(messageType) {
-    case UA_MESSAGETYPE_ERR: {
+        /* ERR message (not encrypted) */
+    case UA_MESSAGETYPE_ERR:
         if(chunkType != UA_CHUNKTYPE_FINAL)
             return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
         chunkPayload.length = chunk->length - offset;
         chunkPayload.data = chunk->data + offset;
-        return callback(application, channel, messageType, requestId, &chunkPayload);
-    }
+        goto addPayload;
 
+        /* MSG and CLO: Symmetric encryption */
     case UA_MESSAGETYPE_MSG:
     case UA_MESSAGETYPE_CLO: {
         /* Decode and check the symmetric security header (tokenId) */
@@ -1036,6 +1082,8 @@ UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
         sequenceNumberCallback = processSequenceNumberSym;
         break;
     }
+
+        /* OPN: Asymmetric encryption */
     case UA_MESSAGETYPE_OPN: {
         /* Chunking not allowed for OPN */
         if(chunkType != UA_CHUNKTYPE_FINAL)
@@ -1059,44 +1107,86 @@ UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
         sequenceNumberCallback = processSequenceNumberAsym;
         break;
     }
-    default:return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
+
+        /* Invalid message type */
+    default:
+        return UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
     }
 
     /* Decrypt message */
     UA_assert(cryptoModule != NULL);
-    retval = decryptChunk(channel, cryptoModule, chunk, offset, &requestId,
-                          &sequenceNumber, &chunkPayload, messageType);
+    retval = decryptChunk(channel, cryptoModule, messageType, chunk, offset, 
+                          &requestId, &sequenceNumber, &chunkPayload);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
-    /* Check the sequence number */
+#if !defined(FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION)
+    /* Check the sequence number. Skip sequence number checking for fuzzer to
+     * improve coverage */
     if(sequenceNumberCallback == NULL)
         return UA_STATUSCODE_BADINTERNALERROR;
     retval = sequenceNumberCallback(channel, sequenceNumber);
-
-    /* Skip sequence number checking for fuzzer to improve coverage */
-    if(retval != UA_STATUSCODE_GOOD) {
-#if !defined(FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION)
+    if(retval != UA_STATUSCODE_GOOD)
         return retval;
-#else
-        retval = UA_STATUSCODE_GOOD;
 #endif
-    }
 
-    /* Process the payload */
-    if(chunkType == UA_CHUNKTYPE_FINAL) {
-        retval = UA_SecureChannel_finalizeChunk(channel, requestId, &chunkPayload,
-                                                messageType, callback, application);
-    } else if(chunkType == UA_CHUNKTYPE_INTERMEDIATE) {
-        retval = UA_SecureChannel_appendChunk(channel, requestId, &chunkPayload);
-    } else if(chunkType == UA_CHUNKTYPE_ABORT) {
-        UA_SecureChannel_removeChunks(channel, requestId);
-    } else {
+    /* Add the payload to the (new) message */
+ addPayload:
+    switch(chunkType) {
+    case UA_CHUNKTYPE_INTERMEDIATE:
+    case UA_CHUNKTYPE_FINAL:
+        retval = addChunkPayload(channel, requestId, messageType,
+                                 &chunkPayload, chunkType == UA_CHUNKTYPE_FINAL);
+        break;
+    case UA_CHUNKTYPE_ABORT:
+        deleteLatestMessage(channel, requestId);
+        break;
+    default:
         retval = UA_STATUSCODE_BADTCPMESSAGETYPEINVALID;
     }
     return retval;
 }
 
+UA_StatusCode
+UA_SecureChannel_decryptAddChunk(UA_SecureChannel *channel, const UA_ByteString *chunk) {
+    /* Has the SecureChannel timed out? */
+    if(channel->state == UA_SECURECHANNELSTATE_CLOSED)
+        return UA_STATUSCODE_BADSECURECHANNELCLOSED;
+    if(channel->state == UA_SECURECHANNELSTATE_OPEN &&
+       (channel->securityToken.createdAt +
+        (channel->securityToken.revisedLifetime * UA_DATETIME_MSEC))
+       < UA_DateTime_nowMonotonic()) {
+        UA_SecureChannel_deleteMembersCleanup(channel);
+        return UA_STATUSCODE_BADSECURECHANNELCLOSED;
+    }
+
+    UA_StatusCode retval = decryptAddChunk(channel, chunk);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_SecureChannel_deleteMembersCleanup(channel);
+    return retval;
+}
+
+UA_StatusCode
+UA_SecureChannel_persistIncompleteMessages(UA_SecureChannel *channel) {
+    UA_Message *me;
+    TAILQ_FOREACH(me, &channel->messages, pointers) {
+        UA_ChunkPayload *cp;
+        SIMPLEQ_FOREACH(cp, &me->chunkPayloads, pointers) {
+            if(cp->copied)
+                continue;
+            UA_ByteString copy;
+            UA_StatusCode retval = UA_ByteString_copy(&cp->bytes, &copy);
+            if(retval != UA_STATUSCODE_GOOD) {
+                UA_SecureChannel_deleteMembersCleanup(channel);
+                return retval;
+            }
+            cp->bytes = copy;
+            cp->copied = true;
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
 /* Functionality used by both the SecureChannel and the SecurityPolicy */
 
 size_t

+ 45 - 23
src/ua_securechannel.h

@@ -44,17 +44,25 @@ typedef struct UA_SessionHeader {
 } UA_SessionHeader;
 
 /* For chunked requests */
-struct ChunkPayload {
-    SIMPLEQ_ENTRY(ChunkPayload) pointers;
+typedef struct UA_ChunkPayload {
+    SIMPLEQ_ENTRY(UA_ChunkPayload) pointers;
     UA_ByteString bytes;
-};
-
-struct MessageEntry {
-    LIST_ENTRY(MessageEntry) pointers;
+    UA_Boolean copied; /* Do the bytes point to a buffer from the network or was
+                          memory allocated for the chunk separately */
+} UA_ChunkPayload;
+
+/* Receieved messages. Process them only in order. The Chunk payload has all
+ * headers and the padding stripped out. The payload begins at the
+ * ExtensionObject prefix.*/
+typedef struct UA_Message {
+    TAILQ_ENTRY(UA_Message) pointers;
     UA_UInt32 requestId;
-    SIMPLEQ_HEAD(chunkpayload_pointerlist, ChunkPayload) chunkPayload;
-    size_t chunkPayloadSize;
-};
+    UA_MessageType messageType;
+    SIMPLEQ_HEAD(pp, UA_ChunkPayload) chunkPayloads;
+    size_t chunkPayloadsSize; /* No of chunks received so far */
+    size_t messageSize; /* Total length of the chunks received so far */
+    UA_Boolean final; /* All chunks for the message have been received */
+} UA_Message;
 
 typedef enum {
     UA_SECURECHANNELSTATE_FRESH,
@@ -62,6 +70,8 @@ typedef enum {
     UA_SECURECHANNELSTATE_CLOSED
 } UA_SecureChannelState;
 
+typedef TAILQ_HEAD(UA_MessageQueue, UA_Message) UA_MessageQueue;
+
 struct UA_SecureChannel {
     UA_SecureChannelState   state;
     UA_MessageSecurityMode  securityMode;
@@ -84,14 +94,18 @@ struct UA_SecureChannel {
     UA_UInt32 receiveSequenceNumber;
     UA_UInt32 sendSequenceNumber;
 
-    LIST_HEAD(session_pointerlist, UA_SessionHeader) sessions;
-    LIST_HEAD(chunk_pointerlist, MessageEntry) chunks;
+    LIST_HEAD(, UA_SessionHeader) sessions;
+    UA_MessageQueue messages;
 };
 
 UA_StatusCode
 UA_SecureChannel_init(UA_SecureChannel *channel,
                       const UA_SecurityPolicy *securityPolicy,
                       const UA_ByteString *remoteCertificate);
+
+/* Remove (partially) received unprocessed messages */
+void UA_SecureChannel_deleteMessages(UA_SecureChannel *channel);
+
 void UA_SecureChannel_deleteMembersCleanup(UA_SecureChannel *channel);
 
 /* Generates new keys and sets them in the channel context */
@@ -166,31 +180,39 @@ void
 UA_MessageContext_abort(UA_MessageContext *mc);
 
 /**
- * Process Received Chunks
- * ----------------------- */
+ * Receive Message
+ * --------------- */
 
-typedef UA_StatusCode
+/* Decrypt a chunk and add it to the message. Create a new message if necessary. */
+UA_StatusCode
+UA_SecureChannel_decryptAddChunk(UA_SecureChannel *channel, const UA_ByteString *chunk);
+
+/* The network buffer is about to be cleared. Copy all chunks that point into
+ * the network buffer into dedicated memory. */
+UA_StatusCode
+UA_SecureChannel_persistIncompleteMessages(UA_SecureChannel *channel);
+
+typedef void
 (UA_ProcessMessageCallback)(void *application, UA_SecureChannel *channel,
                             UA_MessageType messageType, UA_UInt32 requestId,
                             const UA_ByteString *message);
 
-/* Process a single chunk. This also decrypts the chunk if required. The
- * callback function is called with the complete message body if the message is
- * complete.
+/* Process received complete messages in-order. The callback function is called
+ * with the complete message body if the message is complete. The message is
+ * removed afterwards.
  *
  * Symmetric callback is ERR, MSG, CLO only
  * Asymmetric callback is OPN only
  *
  * @param channel the channel the chunks were received on.
- * @param chunks the memory region where the chunks are stored.
+ * @param application data pointer to application specific data that gets passed
+ *                    on to the callback function.
  * @param callback the callback function that gets called with the complete
  *                 message body, once a final chunk is processed.
- * @param application data pointer to application specific data that gets passed
- *                    on to the callback function. */
+ * @return Returns if an irrecoverable error occured. Maybe close the channel. */
 UA_StatusCode
-UA_SecureChannel_processChunk(UA_SecureChannel *channel, UA_ByteString *chunk,
-                              UA_ProcessMessageCallback callback,
-                              void *application);
+UA_SecureChannel_processCompleteMessages(UA_SecureChannel *channel, void *application,
+                                         UA_ProcessMessageCallback callback);
 
 /**
  * Log Helper

+ 9 - 14
tests/fuzz/ua_debug_dump_pkgs_file.c

@@ -126,17 +126,14 @@ UA_debug_dump_setName_withoutChannel(UA_Server *server, UA_Connection *connectio
  *
  * message is the decoded message starting at the nodeid of the content type.
  */
-static UA_StatusCode
+static void
 UA_debug_dump_setName_withChannel(void *application, UA_SecureChannel *channel,
-                            UA_MessageType messagetype, UA_UInt32 requestId,
-                            const UA_ByteString *message) {
+                                  UA_MessageType messagetype, UA_UInt32 requestId,
+                                  const UA_ByteString *message) {
     struct UA_dump_filename *dump_filename = (struct UA_dump_filename *)application;
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
     dump_filename->messageType = UA_debug_dumpGetMessageTypePrefix(messagetype);
-    if (messagetype == UA_MESSAGETYPE_MSG) {
+    if(messagetype == UA_MESSAGETYPE_MSG)
         UA_debug_dumpSetServiceName(message, dump_filename->serviceName);
-    }
-    return retval;
 }
 
 /**
@@ -156,14 +153,14 @@ UA_debug_dumpCompleteChunk(UA_Server *const server, UA_Connection *const connect
     if(!connection->channel) {
         UA_debug_dump_setName_withoutChannel(server, connection, messageBuffer, &dump_filename);
     } else {
-        // make a backup of the sequence number and reset it, because processChunk increases it
-        UA_UInt32 seqBackup = connection->channel->receiveSequenceNumber;
+        UA_SecureChannel dummy = *connection->channel;
+        TAILQ_INIT(&dummy.messages);
         UA_ByteString messageBufferCopy;
         UA_ByteString_copy(messageBuffer, &messageBufferCopy);
-        UA_SecureChannel_processChunk(connection->channel, &messageBufferCopy,
-                                      UA_debug_dump_setName_withChannel, &dump_filename);
+        UA_SecureChannel_decryptAddChunk(&dummy, &messageBufferCopy);
+        UA_SecureChannel_processCompleteMessages(&dummy, &dump_filename, UA_debug_dump_setName_withChannel);
+        UA_SecureChannel_deleteMessages(&dummy);
         UA_ByteString_deleteMembers(&messageBufferCopy);
-        connection->channel->receiveSequenceNumber = seqBackup;
     }
 
     char fileName[250];
@@ -186,5 +183,3 @@ UA_debug_dumpCompleteChunk(UA_Server *const server, UA_Connection *const connect
     fwrite(messageBuffer->data, messageBuffer->length, 1, write_ptr); // write 10 bytes from our buffer
     fclose(write_ptr);
 }
-
-