Selaa lähdekoodia

use callback to clean up chunk processing in the connection

Julius Pfrommer 7 vuotta sitten
vanhempi
commit
aa519069ad

+ 1 - 0
plugins/ua_network_tcp.c

@@ -505,6 +505,7 @@ ServerNetworkLayerTCP_listen(UA_ServerNetworkLayer *nl,
         if(retval == UA_STATUSCODE_GOOD) {
             /* Process packets */
             UA_Server_processBinaryMessage(server, &e->connection, &buf);
+            connection_releaserecvbuffer(&e->connection, &buf);
         } else if(retval == UA_STATUSCODE_BADCONNECTIONCLOSED) {
             /* The socket is shutdown but not closed */
             if(e->connection.state != UA_CONNECTION_CLOSED) {

+ 125 - 123
src/client/ua_client.c

@@ -85,6 +85,39 @@ UA_ClientState UA_Client_getState(UA_Client *client) {
 
 #define UA_MINMESSAGESIZE 8192
 
+static void
+processHelResponse(void *application, UA_Connection *connection,
+                   const UA_ByteString *chunk) {
+    UA_Client *client = (UA_Client*)application;
+
+    /* Decode the message */
+    size_t offset = 0;
+    UA_StatusCode retval;
+    UA_TcpMessageHeader messageHeader;
+    UA_TcpAcknowledgeMessage ackMessage;
+    retval = UA_TcpMessageHeader_decodeBinary(chunk, &offset, &messageHeader);
+    retval |= UA_TcpAcknowledgeMessage_decodeBinary(chunk, &offset, &ackMessage);
+
+    /* Store remote connection settings and adjust local configuration to not
+     * exceed the limits */
+    if(retval == UA_STATUSCODE_GOOD) {
+        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_NETWORK, "Received ACK message");
+        connection->remoteConf.maxChunkCount = ackMessage.maxChunkCount; /* may be zero -> unlimited */
+        connection->remoteConf.maxMessageSize = ackMessage.maxMessageSize; /* may be zero -> unlimited */
+        connection->remoteConf.protocolVersion = ackMessage.protocolVersion;
+        connection->remoteConf.sendBufferSize = ackMessage.sendBufferSize;
+        connection->remoteConf.recvBufferSize = ackMessage.receiveBufferSize;
+        if(connection->remoteConf.recvBufferSize < connection->localConf.sendBufferSize)
+            connection->localConf.sendBufferSize = connection->remoteConf.recvBufferSize;
+        if(connection->remoteConf.sendBufferSize < connection->localConf.recvBufferSize)
+            connection->localConf.recvBufferSize = connection->remoteConf.sendBufferSize;
+        connection->state = UA_CONNECTION_ESTABLISHED;
+    } else {
+        UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_NETWORK, "Decoding ACK message failed");
+    }
+    UA_TcpAcknowledgeMessage_deleteMembers(&ackMessage);
+}
+
 static UA_StatusCode
 HelAckHandshake(UA_Client *client) {
     /* Get a buffer */
@@ -131,48 +164,89 @@ HelAckHandshake(UA_Client *client) {
                  "Sent HEL message");
 
     /* Loop until we have a complete chunk */
-    UA_ByteString reply = UA_BYTESTRING_NULL;
-    UA_Boolean realloced = false;
-    retval = UA_Connection_receiveChunksBlocking(conn, &reply, &realloced,
+    retval = UA_Connection_receiveChunksBlocking(conn, client, processHelResponse,
                                                  client->config.timeout);
-    if(retval != UA_STATUSCODE_GOOD) {
+    if(retval != UA_STATUSCODE_GOOD)
         UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_NETWORK,
                     "Receiving ACK message failed");
-        return retval;
-    }
+    return retval;
+}
 
-    /* Decode the message */
+struct OPNResponseData {
+    UA_Client *client;
+    UA_Boolean renew;
+};
+
+static void
+processOPNResponse(void *data, UA_Connection *connection,
+                   const UA_ByteString *chunk) {
+    struct OPNResponseData *rd = (struct OPNResponseData*)data;
+    UA_Client *client = rd->client;
+
+    /* Decode the header */
     size_t offset = 0;
-    UA_TcpAcknowledgeMessage ackMessage;
-    retval = UA_TcpMessageHeader_decodeBinary(&reply, &offset, &messageHeader);
-    retval |= UA_TcpAcknowledgeMessage_decodeBinary(&reply, &offset, &ackMessage);
+    UA_StatusCode retval;
+    UA_SecureConversationMessageHeader messageHeader;
+    UA_AsymmetricAlgorithmSecurityHeader asymHeader;
+    UA_SequenceHeader seqHeader;
+    UA_NodeId requestType;
+    retval = UA_SecureConversationMessageHeader_decodeBinary(chunk, &offset, &messageHeader);
+    retval |= UA_AsymmetricAlgorithmSecurityHeader_decodeBinary(chunk, &offset, &asymHeader);
+    retval |= UA_SequenceHeader_decodeBinary(chunk, &offset, &seqHeader);
+    retval |= UA_NodeId_decodeBinary(chunk, &offset, &requestType);
+    UA_NodeId expectedRequest =
+        UA_NODEID_NUMERIC(0, UA_TYPES[UA_TYPES_OPENSECURECHANNELRESPONSE].binaryEncodingId);
+    if(retval != UA_STATUSCODE_GOOD || !UA_NodeId_equal(&requestType, &expectedRequest)) {
+        UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
+        UA_NodeId_deleteMembers(&requestType);
+        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                     "Chunk answers the wrong request. Expected OpenSecureChannelResponse.");
+        client->state = UA_CLIENTSTATE_ERRORED;
+    }
 
-    /* Free the message buffer */
-    if(!realloced)
-        conn->releaseRecvBuffer(conn, &reply);
-    else
-        UA_ByteString_deleteMembers(&reply);
+    /* Save the sequence number from server */
+    client->channel.receiveSequenceNumber = seqHeader.sequenceNumber;
+
+    /* Decode the response */
+    UA_OpenSecureChannelResponse response;
+    retval = UA_OpenSecureChannelResponse_decodeBinary(chunk, &offset, &response);
+
+    /* Results in either the StatusCode of decoding or the service */
+    retval |= response.responseHeader.serviceResult;
 
-    /* Store remote connection settings and adjust local configuration to not
-       exceed the limits */
     if(retval == UA_STATUSCODE_GOOD) {
-        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_NETWORK, "Received ACK message");
-        conn->remoteConf.maxChunkCount = ackMessage.maxChunkCount; /* may be zero -> unlimited */
-        conn->remoteConf.maxMessageSize = ackMessage.maxMessageSize; /* may be zero -> unlimited */
-        conn->remoteConf.protocolVersion = ackMessage.protocolVersion;
-        conn->remoteConf.sendBufferSize = ackMessage.sendBufferSize;
-        conn->remoteConf.recvBufferSize = ackMessage.receiveBufferSize;
-        if(conn->remoteConf.recvBufferSize < conn->localConf.sendBufferSize)
-            conn->localConf.sendBufferSize = conn->remoteConf.recvBufferSize;
-        if(conn->remoteConf.sendBufferSize < conn->localConf.recvBufferSize)
-            conn->localConf.recvBufferSize = conn->remoteConf.sendBufferSize;
-        conn->state = UA_CONNECTION_ESTABLISHED;
+        /* Response.securityToken.revisedLifetime is UInt32 we need to cast it
+         * to DateTime=Int64 we take 75% of lifetime to start renewing as
+         *  described in standard */
+        client->nextChannelRenewal = UA_DateTime_nowMonotonic() +
+            (UA_DateTime)(response.securityToken.revisedLifetime * (UA_Double)UA_MSEC_TO_DATETIME * 0.75);
+
+        /* Replace the old nonce */
+        UA_ChannelSecurityToken_deleteMembers(&client->channel.securityToken);
+        UA_ChannelSecurityToken_copy(&response.securityToken, &client->channel.securityToken);
+        UA_ByteString_deleteMembers(&client->channel.serverNonce);
+        UA_ByteString_copy(&response.serverNonce, &client->channel.serverNonce);
+
+        if(rd->renew)
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
+                         "SecureChannel renewed");
+        else
+            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
+                         "SecureChannel opened");
     } else {
-        UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_NETWORK, "Decoding ACK message failed");
+        if(rd->renew)
+            UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
+                        "SecureChannel could not be renewed "
+                        "with error code %s", UA_StatusCode_name(retval));
+        else
+            UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
+                        "SecureChannel could not be opened "
+                        "with error code %s", UA_StatusCode_name(retval));
     }
-    UA_TcpAcknowledgeMessage_deleteMembers(&ackMessage);
 
-    return retval;
+    /* Clean up */
+    UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
+    UA_OpenSecureChannelResponse_deleteMembers(&response);
 }
 
 static UA_StatusCode
@@ -255,82 +329,14 @@ SecureChannelHandshake(UA_Client *client, UA_Boolean renew) {
         return retval;
 
     /* Receive the response */
-    UA_ByteString reply = UA_BYTESTRING_NULL;
-    UA_Boolean realloced = false;
-    retval = UA_Connection_receiveChunksBlocking(conn, &reply, &realloced,
+    struct OPNResponseData rd;
+    rd.client = client;
+    rd.renew = renew;
+    retval = UA_Connection_receiveChunksBlocking(conn, &rd, processOPNResponse,
                                                  client->config.timeout);
-    if(retval != UA_STATUSCODE_GOOD) {
+    if(retval != UA_STATUSCODE_GOOD)
         UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
                      "Receiving OpenSecureChannelResponse failed");
-        return retval;
-    }
-
-    /* Decode the header */
-    size_t offset = 0;
-    retval = UA_SecureConversationMessageHeader_decodeBinary(&reply, &offset, &messageHeader);
-    retval |= UA_AsymmetricAlgorithmSecurityHeader_decodeBinary(&reply, &offset, &asymHeader);
-    retval |= UA_SequenceHeader_decodeBinary(&reply, &offset, &seqHeader);
-    retval |= UA_NodeId_decodeBinary(&reply, &offset, &requestType);
-    UA_NodeId expectedRequest =
-        UA_NODEID_NUMERIC(0, UA_TYPES[UA_TYPES_OPENSECURECHANNELRESPONSE].binaryEncodingId);
-    if(retval != UA_STATUSCODE_GOOD || !UA_NodeId_equal(&requestType, &expectedRequest)) {
-        UA_ByteString_deleteMembers(&reply);
-        UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
-        UA_NodeId_deleteMembers(&requestType);
-        UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                     "Reply answers the wrong request. Expected OpenSecureChannelResponse.");
-        return UA_STATUSCODE_BADINTERNALERROR;
-    }
-
-    /* Save the sequence number from server */
-    client->channel.receiveSequenceNumber = seqHeader.sequenceNumber;
-
-    /* Decode the response */
-    UA_OpenSecureChannelResponse response;
-    retval = UA_OpenSecureChannelResponse_decodeBinary(&reply, &offset, &response);
-
-    /* Free the message */
-    if(!realloced)
-        conn->releaseRecvBuffer(conn, &reply);
-    else
-        UA_ByteString_deleteMembers(&reply);
-
-    /* Results in either the StatusCode of decoding or the service */
-    retval |= response.responseHeader.serviceResult;
-
-    if(retval == UA_STATUSCODE_GOOD) {
-        /* Response.securityToken.revisedLifetime is UInt32 we need to cast it
-         * to DateTime=Int64 we take 75% of lifetime to start renewing as
-         *  described in standard */
-        client->nextChannelRenewal = UA_DateTime_nowMonotonic() +
-            (UA_DateTime)(response.securityToken.revisedLifetime * (UA_Double)UA_MSEC_TO_DATETIME * 0.75);
-
-        /* Replace the old nonce */
-        UA_ChannelSecurityToken_deleteMembers(&client->channel.securityToken);
-        UA_ChannelSecurityToken_copy(&response.securityToken, &client->channel.securityToken);
-        UA_ByteString_deleteMembers(&client->channel.serverNonce);
-        UA_ByteString_copy(&response.serverNonce, &client->channel.serverNonce);
-
-        if(renew)
-            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                         "SecureChannel renewed");
-        else
-            UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                         "SecureChannel opened");
-    } else {
-        if(renew)
-            UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                        "SecureChannel could not be renewed "
-                        "with error code %s", UA_StatusCode_name(retval));
-        else
-            UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_SECURECHANNEL,
-                        "SecureChannel could not be opened "
-                        "with error code %s", UA_StatusCode_name(retval));
-    }
-
-    /* Clean up */
-    UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
-    UA_OpenSecureChannelResponse_deleteMembers(&response);
     return retval;
 }
 
@@ -827,10 +833,17 @@ processServiceResponse(SyncResponseDescription *rd, UA_SecureChannel *channel,
     }
 }
 
+static void
+processChunk(void *application, UA_Connection *connection,
+             const UA_ByteString *chunk) {
+    SyncResponseDescription *rd = (SyncResponseDescription*)application;
+    UA_SecureChannel_processChunk(&rd->client->channel, chunk,
+                                  (UA_ProcessMessageCallback*)processServiceResponse, rd);
+}
+
 static UA_StatusCode
-receiveServiceResponse(UA_Client *client, void *response,
-                       const UA_DataType *responseType, UA_DateTime maxDate,
-                       UA_UInt32 *synchronousRequestId) {
+receiveServiceResponse(UA_Client *client, void *response, const UA_DataType *responseType,
+                       UA_DateTime maxDate, UA_UInt32 *synchronousRequestId) {
     /* Prepare the response and the structure we give into processServiceResponse */
     SyncResponseDescription rd = {client, false, 0, response, responseType};
 
@@ -841,33 +854,22 @@ receiveServiceResponse(UA_Client *client, void *response,
 
     do {
         /* Retrieve complete chunks */
-        UA_ByteString reply = UA_BYTESTRING_NULL;
-        UA_Boolean realloced = false;
         UA_DateTime now = UA_DateTime_nowMonotonic();
         if(now > maxDate)
             return UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
         UA_UInt32 timeout = (UA_UInt32)((maxDate - now) / UA_MSEC_TO_DATETIME);
         UA_StatusCode retval =
-            UA_Connection_receiveChunksBlocking(&client->connection, &reply,
-                                                &realloced, timeout);
+            UA_Connection_receiveChunksBlocking(&client->connection, &rd,
+                                                processChunk, timeout);
         if(retval != UA_STATUSCODE_GOOD) {
-            if (retval == UA_STATUSCODE_BADCONNECTIONCLOSED) {
-                // set client to error state so that call to connect will force a new connection attempt
+            if(retval == UA_STATUSCODE_BADCONNECTIONCLOSED) {
+                /* set client to error state so that call to connect will force
+                 * a new connection attempt */
                 client->state = UA_CLIENTSTATE_ERRORED;
             }
             return retval;
         }
-
-        /* ProcessChunks and call processServiceResponse for complete messages */
-        UA_SecureChannel_processChunks(&client->channel, &reply,
-                                       (UA_ProcessMessageCallback*)processServiceResponse, &rd);
-        /* Free the received buffer */
-        if(!realloced)
-            client->connection.releaseRecvBuffer(&client->connection, &reply);
-        else
-            UA_ByteString_deleteMembers(&reply);
     } while(!rd.received);
-
     return UA_STATUSCODE_GOOD;
 }
 

+ 37 - 46
src/server/ua_server_binary.c

@@ -624,50 +624,13 @@ UA_Server_processSecureChannelMessage(UA_Server *server, UA_SecureChannel *chann
     }
 }
 
-/* Takes the raw message from the network layer */
 static void
-processBinaryMessage(UA_Server *server, UA_Connection *connection,
+processCompleteChunk(UA_Server *server, UA_Connection *connection,
                      UA_ByteString *message) {
-    UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_NETWORK,
-                 "Connection %i | Received a packet.", connection->sockfd);
-
-    #ifdef UA_DEBUG_DUMP_PKGS
-    UA_dump_hex_pkg(message->data, message->length);
-    #endif
-
-    UA_Boolean realloced = UA_FALSE;
-    UA_StatusCode retval = UA_Connection_completeChunks(connection, message, &realloced);
-
-    /* No failure, but no chunk ready */
-    if(message->length == 0) {
-        UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_NETWORK,
-                     "Connection %i | Not a complete chunk yet.", connection->sockfd);
-        return;
-    }
-
-    /* Failed to complete a chunk */
-    if(retval != UA_STATUSCODE_GOOD) {
-        if(!realloced)
-            connection->releaseRecvBuffer(connection, message);
-        else
-            UA_ByteString_deleteMembers(message);
-        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_NETWORK,
-                    "Connection %i | Failed to complete a chunk. "
-                    "Closing the connection.", connection->sockfd);
-        connection->close(connection);
-        return;
-    }
-
-    UA_SecureChannel *channel = connection->channel;
-    if(channel) {
-        /* Assemble chunks in the securechannel and process complete messages */
-        retval = UA_SecureChannel_processChunks(channel, message,
-                      (UA_ProcessMessageCallback*)UA_Server_processSecureChannelMessage, server);
-        if(retval != UA_STATUSCODE_GOOD)
-            UA_LOG_TRACE_CHANNEL(server->config.logger, channel, "Procesing chunks "
-                                 "resulted in error code %s", UA_StatusCode_name(retval));
-    } else {
-        /* Process messages without a channel and no chunking */
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    UA_SecureChannel *channel = connection->channel; 
+    if(!channel) {
+        /* Process chunk without a channel; must be OPN */
         UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_NETWORK,
                      "Connection %i | No channel attached to the connection. "
                      "Process the chunk directly", connection->sockfd);
@@ -728,12 +691,40 @@ processBinaryMessage(UA_Server *server, UA_Connection *connection,
                          "Connection %i | Unknown message type", connection->sockfd);
             connection->close(connection);
         }
+        return;
     }
 
-    if(!realloced)
-        connection->releaseRecvBuffer(connection, message);
-    else
-        UA_ByteString_deleteMembers(message);
+    /* Process (and decode) chunk in the securechannel and process complete messages */
+    retval = UA_SecureChannel_processChunk(channel, message, (UA_ProcessMessageCallback*)
+                                           UA_Server_processSecureChannelMessage, server);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_TRACE_CHANNEL(server->config.logger, channel, "Procesing chunks "
+                             "resulted in error code %s", UA_StatusCode_name(retval));
+}
+
+/* Takes the raw message from the network layer */
+static void
+processBinaryMessage(UA_Server *server, UA_Connection *connection,
+                     UA_ByteString *message) {
+    UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_NETWORK,
+                 "Connection %i | Received a packet.", connection->sockfd);
+
+#ifdef UA_DEBUG_DUMP_PKGS
+    UA_dump_hex_pkg(message->data, message->length);
+#endif
+
+    UA_StatusCode retval =
+        UA_Connection_processChunks(connection, server,
+                                    (UA_Connection_processChunk)processCompleteChunk,
+                                    message);
+
+    /* Failed to complete a chunk */
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_NETWORK,
+                    "Connection %i | Failed to complete a chunk. "
+                    "Closing the connection.", connection->sockfd);
+        connection->close(connection);
+    }
 }
 
 #ifndef UA_ENABLE_MULTITHREADING

+ 109 - 114
src/ua_connection.c

@@ -14,166 +14,162 @@ void UA_Connection_deleteMembers(UA_Connection *connection) {
 }
 
 static UA_StatusCode
-prependIncomplete(UA_Connection *connection, UA_ByteString * UA_RESTRICT message,
-                  UA_Boolean * UA_RESTRICT realloced) {
-    UA_assert(connection->incompleteMessage.length > 0);
-
+prependIncompleteChunk(UA_Connection *connection, UA_ByteString *message) {
     /* Allocate the new message buffer */
     size_t length = connection->incompleteMessage.length + message->length;
     UA_Byte *data = (UA_Byte*)UA_realloc(connection->incompleteMessage.data, length);
     if(!data) {
         UA_ByteString_deleteMembers(&connection->incompleteMessage);
-        connection->releaseRecvBuffer(connection, message);
         return UA_STATUSCODE_BADOUTOFMEMORY;
     }
 
     /* Copy / release the current message buffer */
     memcpy(&data[connection->incompleteMessage.length], message->data, message->length);
-    connection->releaseRecvBuffer(connection, message);
     message->length = length;
     message->data = data;
     connection->incompleteMessage = UA_BYTESTRING_NULL;
-    *realloced = true;
     return UA_STATUSCODE_GOOD;
 }
 
-static size_t
-completeChunksUntil(UA_Connection *connection, UA_ByteString * UA_RESTRICT message,
-                    UA_Boolean *garbage_end) {
-    size_t complete_until = 0;
-
-    /* At least 8 byte needed for the header... */
-    while(message->length - complete_until >= 8) {
-        /* Check the message type */
-        UA_UInt32 msgtype = (UA_UInt32)message->data[complete_until] +
-                           ((UA_UInt32)message->data[complete_until+1] << 8) +
-                           ((UA_UInt32)message->data[complete_until+2] << 16);
-        if(msgtype != ('M' + ('S' << 8) + ('G' << 16)) &&
-           msgtype != ('E' + ('R' << 8) + ('R' << 16)) &&
-           msgtype != ('O' + ('P' << 8) + ('N' << 16)) &&
-           msgtype != ('H' + ('E' << 8) + ('L' << 16)) &&
-           msgtype != ('A' + ('C' << 8) + ('K' << 16)) &&
-           msgtype != ('C' + ('L' << 8) + ('O' << 16))) {
-            *garbage_end = true; /* the message type is not recognized */
-            break;
-        }
+static UA_StatusCode
+bufferIncompleteChunk(UA_Connection *connection, const UA_Byte *pos, const UA_Byte *end) {
+    size_t length = (uintptr_t)end - (uintptr_t)pos;
+    UA_StatusCode retval = UA_ByteString_allocBuffer(&connection->incompleteMessage, length);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+    memcpy(connection->incompleteMessage.data, pos, length);
+    return UA_STATUSCODE_GOOD;
+}
 
-        UA_Byte isFinal = message->data[complete_until+3];
-        if (isFinal != 'C' && isFinal != 'F' && isFinal != 'A') {
-            *garbage_end = true; /* the message type is not recognized */
-            break;
-        }
-
-        /* Decoding failed or the message size is not allowed. The remaining
-         * message is garbage. */
-        UA_UInt32 chunk_length = 0;
-        size_t length_pos = complete_until + 4;
-        if(UA_UInt32_decodeBinary(message, &length_pos, &chunk_length) != UA_STATUSCODE_GOOD ||
-           chunk_length < 16 || chunk_length > connection->localConf.recvBufferSize) {
-            *garbage_end = true;
-            break;
-        }
+static UA_Boolean
+processChunk(UA_Connection *connection, void *application,
+             UA_Connection_processChunk processCallback,
+             const UA_Byte **posp, const UA_Byte *end) {
+    const UA_Byte *pos = *posp;
+    size_t length = (uintptr_t)end - (uintptr_t)pos;
+
+    /* At least 8 byte needed for the header. Wait for the next chunk. */
+    if(length < 8) {
+        bufferIncompleteChunk(connection, pos, end);
+        return true;
+    }
 
-        /* The chunk is okay but incomplete. Store the end. */
-        if(chunk_length + complete_until > message->length)
-            break;
+    /* Check the message type */
+    UA_UInt32 msgtype = (UA_UInt32)pos[0] +
+                       ((UA_UInt32)pos[1] << 8) +
+                       ((UA_UInt32)pos[2] << 16);
+    if(msgtype != ('M' + ('S' << 8) + ('G' << 16)) &&
+       msgtype != ('E' + ('R' << 8) + ('R' << 16)) &&
+       msgtype != ('O' + ('P' << 8) + ('N' << 16)) &&
+       msgtype != ('H' + ('E' << 8) + ('L' << 16)) &&
+       msgtype != ('A' + ('C' << 8) + ('K' << 16)) &&
+       msgtype != ('C' + ('L' << 8) + ('O' << 16))) {
+        /* The message type is not recognized */
+        return true;
+    }
 
-        /* Continue to the next chunk */
-        complete_until += chunk_length;
+    UA_Byte isFinal = pos[3];
+    if(isFinal != 'C' && isFinal != 'F' && isFinal != 'A') {
+        /* The message type is not recognized */
+        return true;
     }
 
-    UA_assert(complete_until <= message->length);
-    return complete_until;
-}
+    UA_UInt32 chunk_length = 0;
+    UA_ByteString temp = {8, (UA_Byte*)(uintptr_t)pos}; /* At least 8 byte left */
+    size_t temp_offset = 4;
+    /* Decoding the UInt32 cannot fail */
+    UA_UInt32_decodeBinary(&temp, &temp_offset, &chunk_length);
 
-static UA_StatusCode
-separateIncompleteChunk(UA_Connection *connection, UA_ByteString * UA_RESTRICT message,
-                        size_t complete_until, UA_Boolean garbage_end, UA_Boolean *realloced) {
-    UA_assert(complete_until < message->length);
-
-    /* Garbage after the last good chunk. No need to keep an incomplete message. */
-    if(garbage_end) {
-        if(complete_until == 0) /* All garbage */
-            return UA_STATUSCODE_BADDECODINGERROR;
-        message->length = complete_until;
-        return UA_STATUSCODE_GOOD;
-    }
+    /* The message size is not allowed */
+    if(chunk_length < 16 || chunk_length > connection->localConf.recvBufferSize)
+        return true;
 
-    /* No good chunk, buffer the entire message */
-    if(complete_until == 0) {
-        if(!*realloced) {
-            UA_StatusCode retval =
-                UA_ByteString_allocBuffer(&connection->incompleteMessage, message->length);
-            if(retval != UA_STATUSCODE_GOOD)
-                return retval;
-            memcpy(connection->incompleteMessage.data, message->data, message->length);
-            connection->releaseRecvBuffer(connection, message);
-        } else {
-            connection->incompleteMessage = *message;
-            *message = UA_BYTESTRING_NULL;
-        }
-        return UA_STATUSCODE_GOOD;
+    /* Wait for the next packet to process the complete chunk */
+    if(chunk_length > length) {
+        bufferIncompleteChunk(connection, pos, end);
+        return true;
     }
 
-    /* At least one good chunk and an incomplete one */
-    size_t incomplete_length = message->length - complete_until;
-    UA_StatusCode retval =
-        UA_ByteString_allocBuffer(&connection->incompleteMessage, incomplete_length);
-    if(retval != UA_STATUSCODE_GOOD)
-        return retval;
-    memcpy(connection->incompleteMessage.data, &message->data[complete_until], incomplete_length);
-    message->length = complete_until;
-    return UA_STATUSCODE_GOOD;
+    /* Process the chunk */
+    temp.length = chunk_length;
+    processCallback(application, connection, &temp);
+
+    /* Continue to the next chunk */
+    *posp += chunk_length;
+    return false;
 }
 
 UA_StatusCode
-UA_Connection_completeChunks(UA_Connection *connection,
-                             UA_ByteString * UA_RESTRICT message,
-                             UA_Boolean * UA_RESTRICT realloced) {
+UA_Connection_processChunks(UA_Connection *connection, void *application,
+                            UA_Connection_processChunk processCallback,
+                            const UA_ByteString *packet) {
     /* If we have stored an incomplete chunk, prefix to the received message.
      * After this block, connection->incompleteMessage is always empty. The
      * message and the buffer is released if allocating the memory fails. */
+    UA_Boolean realloced = false;
+    UA_ByteString message = *packet;
     if(connection->incompleteMessage.length > 0) {
-        UA_StatusCode retval = prependIncomplete(connection, message, realloced);
+        UA_StatusCode retval = prependIncompleteChunk(connection, &message);
         if(retval != UA_STATUSCODE_GOOD)
             return retval;
+        realloced = true;
     }
 
-    /* Find the end of the last complete chunk */
-    UA_Boolean garbage_end = false; /* garbage after the last complete message */
-    size_t complete_until = completeChunksUntil(connection, message, &garbage_end);
-
-    /* Buffer incomplete chunk (at the end of the message) in the connection and
-     * adjust the size of the message. */
-    if(complete_until < message->length) {
-        UA_StatusCode retval = separateIncompleteChunk(connection, message, complete_until,
-                                                       garbage_end, realloced);
-        if(retval != UA_STATUSCODE_GOOD) {
-            if(*realloced)
-                UA_ByteString_deleteMembers(message);
-            else
-                connection->releaseRecvBuffer(connection, message);
-            return retval;
-        }
-    }
+    /* Loop over the received chunks. pos is increased with each chunk. */
+    const UA_Byte *pos = message.data;
+    const UA_Byte *end = &message.data[message.length];
+    UA_Boolean done;
+    do {
+        done = processChunk(connection, application, processCallback, &pos, end);
+    } while(!done);
+
+    if(realloced)
+        UA_ByteString_deleteMembers(&message);
     return UA_STATUSCODE_GOOD;
 }
 
+/* In order to know whether a chunk was processed, we insert an indirection into
+ * the callback. */
+struct completeChunkTrampolineData {
+    UA_Boolean called;
+    void *application;
+    UA_Connection_processChunk processCallback;
+};
+
+static void
+completeChunkTrampoline(void *application, UA_Connection *connection,
+                        const UA_ByteString *chunk) {
+    struct completeChunkTrampolineData *data =
+        (struct completeChunkTrampolineData*)application;
+    data->called = true;
+    data->processCallback(data->application, connection, chunk);
+}
+
 UA_StatusCode
-UA_Connection_receiveChunksBlocking(UA_Connection *connection, UA_ByteString *chunks,
-                                    UA_Boolean *realloced, UA_UInt32 timeout) {
+UA_Connection_receiveChunksBlocking(UA_Connection *connection, void *application,
+                                    UA_Connection_processChunk processCallback,
+                                    UA_UInt32 timeout) {
     UA_DateTime now = UA_DateTime_nowMonotonic();
     UA_DateTime maxDate = now + (timeout * UA_MSEC_TO_DATETIME);
-    *realloced = false;
+
+    struct completeChunkTrampolineData data;
+    data.called = false;
+    data.application = application;
+    data.processCallback = processCallback;
 
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     while(true) {
         /* Listen for messages to arrive */
-        retval = connection->recv(connection, chunks, timeout);
+        UA_ByteString packet = UA_BYTESTRING_NULL;
+        retval = connection->recv(connection, &packet, timeout);
+        if(retval != UA_STATUSCODE_GOOD)
+            break;
 
-        /* Get complete chunks and return */
-        retval |= UA_Connection_completeChunks(connection, chunks, realloced);
-        if(retval != UA_STATUSCODE_GOOD || chunks->length > 0)
+        /* Try to process one complete chunk */
+        retval = UA_Connection_processChunks(connection, &data,
+                                             completeChunkTrampoline, &packet);
+        connection->releaseRecvBuffer(connection, &packet);
+        if(data.called)
             break;
 
         /* We received a message. But the chunk is incomplete. Compute the
@@ -186,7 +182,6 @@ UA_Connection_receiveChunksBlocking(UA_Connection *connection, UA_ByteString *ch
     return retval;
 }
 
-
 void UA_Connection_detachSecureChannel(UA_Connection *connection) {
     UA_SecureChannel *channel = connection->channel;
     if(channel)

+ 20 - 20
src/ua_connection_internal.h

@@ -11,9 +11,15 @@ extern "C" {
 
 #include "ua_plugin_network.h"
 
+/* The application can be the client or the server */
+typedef void (*UA_Connection_processChunk)(void *application,
+                                           UA_Connection *connection,
+                                           const UA_ByteString *chunk);
+
 /* The network layer may receive chopped up messages since TCP is a streaming
- * protocol. Furthermore, the networklayer may operate on ringbuffers or
- * statically assigned memory.
+ * protocol. This method calls the processChunk callback on all full chunks that
+ * were received. Dangling half-complete chunks are buffered in the connection
+ * and considered for the next received packet.
  *
  * If an entire chunk is received, it is forwarded directly. But the memory
  * needs to be freed with the networklayer-specific mechanism. If a half message
@@ -21,35 +27,29 @@ extern "C" {
  * needs to be used.
  *
  * @param connection The connection
- * @param message The received packet. The content may be replaced when a chunk
- *        is completed with previously received packets.
- * @param realloced The Boolean value is set to true if the outgoing message has
- *        been reallocated from the network layer.
+ * @param application The client or server application
+ * @param processCallback The function pointer for processing each chunk
+ * @param packet The received packet.
  * @return Returns UA_STATUSCODE_GOOD or an error code. When an error occurs,
  *         the ingoing message and the current buffer in the connection are
  *         freed. */
 UA_StatusCode
-UA_Connection_completeChunks(UA_Connection *connection,
-                               UA_ByteString * UA_RESTRICT message,
-                               UA_Boolean * UA_RESTRICT realloced);
+UA_Connection_processChunks(UA_Connection *connection, void *application,
+                            UA_Connection_processChunk processCallback,
+                            const UA_ByteString *packet);
 
 /* Try to receive at least one complete chunk on the connection. This blocks the
  * current thread up to the given timeout.
  *
  * @param connection The connection
- * @param chunk The received chunk. The memory is allocated either by the
- *        networklayer or internally.
- * @param realloced The Boolean value is set to true if the chunk has been
- *        reallocated from the network layer.
+ * @param application The client or server application
+ * @param processCallback The function pointer for processing each chunk
  * @param timeout The timeout (in milliseconds) the method will block at most.
- * @return Returns UA_STATUSCODE_GOOD or an error code. When an error occurs,
- *         the chunk buffer is returned empty. Upon a timeout,
- *         UA_STATUSCODE_GOODNONCRITICALTIMEOUT is returned.
- */
+ * @return Returns UA_STATUSCODE_GOOD or an error code. When an timeout occurs,
+ *         UA_STATUSCODE_GOODNONCRITICALTIMEOUT is returned. */
 UA_StatusCode
-UA_Connection_receiveChunksBlocking(UA_Connection *connection,
-                                    UA_ByteString *chunks,
-                                    UA_Boolean *realloced,
+UA_Connection_receiveChunksBlocking(UA_Connection *connection, void *application,
+                                    UA_Connection_processChunk processCallback,
                                     UA_UInt32 timeout);
 
 void UA_Connection_detachSecureChannel(UA_Connection *connection);

+ 90 - 110
src/ua_securechannel.c

@@ -256,8 +256,11 @@ UA_SecureChannel_sendBinaryMessage(UA_SecureChannel *channel, UA_UInt32 requestI
 /* Process Received Chunks */
 /***************************/
 
+/* Chunks that still miss the final part are buffered decoded. New chunks are
+ * appended to a single ByteString.*/
+
 static void
-UA_SecureChannel_removeChunk(UA_SecureChannel *channel, UA_UInt32 requestId) {
+UA_SecureChannel_removeChunks(UA_SecureChannel *channel, UA_UInt32 requestId) {
     struct ChunkEntry *ch;
     LIST_FOREACH(ch, &channel->chunks, pointers) {
         if(ch->requestId == requestId) {
@@ -271,8 +274,8 @@ UA_SecureChannel_removeChunk(UA_SecureChannel *channel, UA_UInt32 requestId) {
 
 /* assume that chunklength fits */
 static void
-appendChunk(struct ChunkEntry *ch, const UA_ByteString *msg,
-            size_t offset, size_t chunklength) {
+appendChunk(struct ChunkEntry *ch, const UA_ByteString *msg, size_t offset) {
+    size_t chunklength = msg->length - offset;
     UA_Byte* new_bytes = (UA_Byte *)UA_realloc(ch->bytes.data, ch->bytes.length + chunklength);
     if(!new_bytes) {
         UA_ByteString_deleteMembers(&ch->bytes);
@@ -285,15 +288,7 @@ appendChunk(struct ChunkEntry *ch, const UA_ByteString *msg,
 
 static void
 UA_SecureChannel_appendChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
-                             const UA_ByteString *msg, size_t offset,
-                             size_t chunklength) {
-    /* Check if the chunk fits into the message */
-    if(msg->length - offset < chunklength) {
-        /* can't process all chunks for that request */
-        UA_SecureChannel_removeChunk(channel, requestId);
-        return;
-    }
-
+                             const UA_ByteString *msg, size_t offset) {
     /* Get the chunkentry */
     struct ChunkEntry *ch;
     LIST_FOREACH(ch, &channel->chunks, pointers) {
@@ -311,19 +306,14 @@ UA_SecureChannel_appendChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
         LIST_INSERT_HEAD(&channel->chunks, ch, pointers);
     }
 
-    appendChunk(ch, msg, offset, chunklength);
+    appendChunk(ch, msg, offset);
 }
 
-static UA_ByteString
+static void
 UA_SecureChannel_finalizeChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
                                const UA_ByteString *msg, size_t offset,
-                               size_t chunklength, UA_Boolean *deleteChunk) {
-    if(msg->length - offset < chunklength) {
-        /* can't process all chunks for that request */
-        UA_SecureChannel_removeChunk(channel, requestId);
-        return UA_BYTESTRING_NULL;
-    }
-
+                               UA_MessageType messageType, UA_ProcessMessageCallback callback,
+                               void *application) {
     struct ChunkEntry *ch;
     LIST_FOREACH(ch, &channel->chunks, pointers) {
         if(ch->requestId == requestId)
@@ -332,17 +322,18 @@ UA_SecureChannel_finalizeChunk(UA_SecureChannel *channel, UA_UInt32 requestId,
 
     UA_ByteString bytes;
     if(!ch) {
-        *deleteChunk = false;
-        bytes.length = chunklength;
+        bytes.length = msg->length - offset;
         bytes.data = msg->data + offset;
     } else {
-        *deleteChunk = true;
-        appendChunk(ch, msg, offset, chunklength);
+        appendChunk(ch, msg, offset);
         bytes = ch->bytes;
         LIST_REMOVE(ch, pointers);
         UA_free(ch);
     }
-    return bytes;
+
+    callback(application, channel, messageType, requestId, &bytes);
+    if(ch)
+        UA_ByteString_deleteMembers(&bytes);
 }
 
 static UA_StatusCode
@@ -359,103 +350,92 @@ UA_SecureChannel_processSequenceNumber(UA_SecureChannel *channel, UA_UInt32 Sequ
 }
 
 UA_StatusCode
-UA_SecureChannel_processChunks(UA_SecureChannel *channel, const UA_ByteString *chunks,
-                               UA_ProcessMessageCallback callback, void *application) {
+UA_SecureChannel_processChunk(UA_SecureChannel *channel, const UA_ByteString *chunk,
+                              UA_ProcessMessageCallback callback, void *application) {
+    UA_assert(chunk->length >= 16);
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     size_t offset= 0;
-    do {
-
-        if (chunks->length > 3 && chunks->data[offset] == 'E' &&
-                chunks->data[offset+1] == 'R' && chunks->data[offset+2] == 'R') {
-            UA_TcpMessageHeader header;
-            retval = UA_TcpMessageHeader_decodeBinary(chunks, &offset, &header);
-            if(retval != UA_STATUSCODE_GOOD)
-                break;
-
-            UA_TcpErrorMessage errorMessage;
-            retval = UA_TcpErrorMessage_decodeBinary(chunks, &offset, &errorMessage);
-            if(retval != UA_STATUSCODE_GOOD)
-                break;
-
-            /* TODO: fix dirty cast to pass errorMessage */
-            UA_UInt32 val = 0;
-            callback(application, (UA_SecureChannel *)channel, (UA_MessageType)UA_MESSAGETYPE_ERR,
-                     val, (const UA_ByteString*)&errorMessage);
-            UA_TcpErrorMessage_deleteMembers(&errorMessage);
-            continue;
-        }
 
-        /* Store the initial offset to compute the header length */
-        size_t initial_offset = offset;
+    /* Received an ERR */
+    if(chunk->data[0] == 'E' &&
+       chunk->data[1] == 'R' &&
+       chunk->data[2] == 'R') {
+        UA_TcpMessageHeader header;
+        retval = UA_TcpMessageHeader_decodeBinary(chunk, &offset, &header);
+        if(retval != UA_STATUSCODE_GOOD)
+            return retval;
 
-        /* Decode header */
-        UA_SecureConversationMessageHeader header;
-        retval = UA_SecureConversationMessageHeader_decodeBinary(chunks, &offset, &header);
+        UA_TcpErrorMessage errorMessage;
+        retval = UA_TcpErrorMessage_decodeBinary(chunk, &offset, &errorMessage);
         if(retval != UA_STATUSCODE_GOOD)
-            break;
+            return retval;
 
-        /* Is the channel attached to connection? */
-        if(header.secureChannelId != channel->securityToken.channelId) {
-            //Service_CloseSecureChannel(server, channel);
-            //connection->close(connection);
-            return UA_STATUSCODE_BADCOMMUNICATIONERROR;
-        }
+        /* TODO: fix dirty cast to pass errorMessage */
+        UA_UInt32 val = 0;
+        callback(application, (UA_SecureChannel *)channel,
+                 (UA_MessageType)UA_MESSAGETYPE_ERR,
+                 val, (const UA_ByteString*)&errorMessage);
+        UA_TcpErrorMessage_deleteMembers(&errorMessage);
+        return UA_STATUSCODE_GOOD;
+    }
 
-        /* Use requestId = 0 with OPN as argument for the callback */
-        UA_SequenceHeader sequenceHeader;
-        UA_SequenceHeader_init(&sequenceHeader);
+    /* Decode header */
+    UA_SecureConversationMessageHeader header;
+    retval = UA_SecureConversationMessageHeader_decodeBinary(chunk, &offset, &header);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
 
-        if((header.messageHeader.messageTypeAndChunkType & 0x00ffffff) != UA_MESSAGETYPE_OPN) {
-            /* Check the symmetric security header (not for OPN) */
-            UA_UInt32 tokenId = 0;
-            retval |= UA_UInt32_decodeBinary(chunks, &offset, &tokenId);
-            retval |= UA_SequenceHeader_decodeBinary(chunks, &offset, &sequenceHeader);
-            if(retval != UA_STATUSCODE_GOOD)
-                return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    /* Is the channel attached to connection? */
+    if(header.secureChannelId != channel->securityToken.channelId) {
+        //Service_CloseSecureChannel(server, channel);
+        //connection->close(connection);
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
 
-            /* Does the token match? */
-            if(tokenId != channel->securityToken.tokenId) {
-                if(tokenId != channel->nextSecurityToken.tokenId)
-                    return UA_STATUSCODE_BADCOMMUNICATIONERROR;
-                UA_SecureChannel_revolveTokens(channel);
-            }
+    /* Use requestId = 0 with OPN as argument for the callback */
+    UA_SequenceHeader sequenceHeader;
+    UA_SequenceHeader_init(&sequenceHeader);
 
-            /* Does the sequence number match? */
-            retval = UA_SecureChannel_processSequenceNumber(channel, sequenceHeader.sequenceNumber);
-            if(retval != UA_STATUSCODE_GOOD)
-                return UA_STATUSCODE_BADCOMMUNICATIONERROR;
-        }
+    /* Check the symmetric security header (not for OPN) */
+    UA_MessageType messageType = header.messageHeader.messageTypeAndChunkType & 0x00ffffff;
+    if(messageType != UA_MESSAGETYPE_OPN) {
+        UA_UInt32 tokenId = 0;
+        retval |= UA_UInt32_decodeBinary(chunk, &offset, &tokenId);
+        retval |= UA_SequenceHeader_decodeBinary(chunk, &offset, &sequenceHeader);
+        if(retval != UA_STATUSCODE_GOOD)
+            return UA_STATUSCODE_BADCOMMUNICATIONERROR;
 
-        /* Process chunk */
-        size_t processed_header = offset - initial_offset;
-        switch(header.messageHeader.messageTypeAndChunkType & 0xff000000) {
-        case UA_CHUNKTYPE_INTERMEDIATE:
-            UA_SecureChannel_appendChunk(channel, sequenceHeader.requestId, chunks, offset,
-                                         header.messageHeader.messageSize - processed_header);
-            break;
-        case UA_CHUNKTYPE_FINAL: {
-            UA_Boolean realloced = false;
-            UA_ByteString message =
-                UA_SecureChannel_finalizeChunk(channel, sequenceHeader.requestId, chunks, offset,
-                                               header.messageHeader.messageSize - processed_header,
-                                               &realloced);
-            if(message.length > 0) {
-                callback(application,(UA_SecureChannel *)channel,(UA_MessageType)(header.messageHeader.messageTypeAndChunkType & 0x00ffffff),
-                         sequenceHeader.requestId, &message);
-                if(realloced)
-                    UA_ByteString_deleteMembers(&message);
-            }
-            break; }
-        case UA_CHUNKTYPE_ABORT:
-            UA_SecureChannel_removeChunk(channel, sequenceHeader.requestId);
-            break;
-        default:
-            return UA_STATUSCODE_BADDECODINGERROR;
+        /* Does the token match? */
+        if(tokenId != channel->securityToken.tokenId) {
+            if(tokenId != channel->nextSecurityToken.tokenId)
+                return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+            UA_SecureChannel_revolveTokens(channel);
         }
 
-        /* Jump to the end of the chunk */
-        offset += (header.messageHeader.messageSize - processed_header);
-    } while(chunks->length > offset);
+        /* Does the sequence number match? */
+        retval = UA_SecureChannel_processSequenceNumber(channel, sequenceHeader.sequenceNumber);
+        if(retval != UA_STATUSCODE_GOOD)
+            return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
 
+    /* At least one byte of payload required */
+    if(offset >= chunk->length)
+        retval = UA_STATUSCODE_BADDECODINGERROR;
+
+    /* Process chunk */
+    switch(header.messageHeader.messageTypeAndChunkType & 0xff000000) {
+    case UA_CHUNKTYPE_INTERMEDIATE:
+        UA_SecureChannel_appendChunk(channel, sequenceHeader.requestId, chunk, offset);
+        break;
+    case UA_CHUNKTYPE_FINAL:
+        UA_SecureChannel_finalizeChunk(channel, sequenceHeader.requestId, chunk, offset,
+                                       messageType, callback, application);
+        break;
+    case UA_CHUNKTYPE_ABORT:
+        UA_SecureChannel_removeChunks(channel, sequenceHeader.requestId);
+        break;
+    default:
+        retval = UA_STATUSCODE_BADDECODINGERROR;
+    }
     return retval;
 }

+ 4 - 4
src/ua_securechannel.h

@@ -76,12 +76,12 @@ void UA_SecureChannel_revolveTokens(UA_SecureChannel *channel);
  * -------- */
 typedef void
 (UA_ProcessMessageCallback)(void *application, UA_SecureChannel *channel,
-                             UA_MessageType messageType, UA_UInt32 requestId,
-                             const UA_ByteString *message);
+                            UA_MessageType messageType, UA_UInt32 requestId,
+                            const UA_ByteString *message);
 
 UA_StatusCode
-UA_SecureChannel_processChunks(UA_SecureChannel *channel, const UA_ByteString *chunks,
-                               UA_ProcessMessageCallback callback, void *application);
+UA_SecureChannel_processChunk(UA_SecureChannel *channel, const UA_ByteString *chunk,
+                              UA_ProcessMessageCallback callback, void *application);
 
 /**
  * Log Helper

+ 1 - 4
tests/check_server_binary_messages.c

@@ -39,10 +39,7 @@ START_TEST(processMessage) {
     UA_Server *server = UA_Server_new(config);
     for(size_t i = 0; i < files; i++) {
         UA_ByteString msg = readFile(filenames[i]);
-        UA_Boolean reallocated;
-        UA_StatusCode retval = UA_Connection_completeChunks(&c, &msg, &reallocated);
-        if(retval == UA_STATUSCODE_GOOD && msg.length > 0)
-            UA_Server_processBinaryMessage(server, &c, &msg);
+        UA_Server_processBinaryMessage(server, &c, &msg);
         UA_ByteString_deleteMembers(&msg);
     }
     UA_Server_delete(server);