Explorar el Código

Merge branch 'jgilje-chunked_receive'

Julius Pfrommer hace 9 años
padre
commit
c6879e4c6d
Se han modificado 3 ficheros con 128 adiciones y 9 borrados
  1. 112 9
      src/server/ua_server_binary.c
  2. 8 0
      src/ua_securechannel.c
  3. 8 0
      src/ua_securechannel.h

+ 112 - 9
src/server/ua_server_binary.c

@@ -310,6 +310,48 @@ sendError(UA_SecureChannel *channel, const UA_ByteString *msg, size_t pos,
     UA_ResponseHeader_deleteMembers(&r);
 }
 
+static void
+appendChunkedMessage(struct ChunkEntry *ch, const UA_ByteString *msg, size_t *pos) {
+    if (ch->invalid_message) {
+        return;
+    }
+
+    UA_UInt32 len;
+    *pos -= 20;
+    UA_UInt32_decodeBinary(msg, pos, &len);
+    if (len > msg->length) {
+        UA_ByteString_deleteMembers(&ch->bytes);
+        ch->invalid_message = UA_TRUE;
+        return;
+    }
+    len -= 24;
+    *pos += 16; // 4 bytes consumed by decode above
+
+    UA_Byte* new_bytes = UA_realloc(ch->bytes.data, ch->bytes.length + len);
+    if (! new_bytes) {
+        UA_ByteString_deleteMembers(&ch->bytes);
+        ch->invalid_message = UA_TRUE;
+        return;
+    }
+    ch->bytes.data = new_bytes;
+
+    memcpy(&ch->bytes.data[ch->bytes.length], &msg->data[*pos], len);
+    ch->bytes.length += len;
+    *pos += len;
+}
+
+static struct ChunkEntry*
+chunkEntryFromRequestId(UA_SecureChannel *channel, UA_UInt32 requestId) {
+    struct ChunkEntry *ch;
+    LIST_FOREACH(ch, &channel->chunks, pointers) {
+        if (ch->requestId == requestId) {
+            return ch;
+        }
+    }
+
+    return NULL;
+}
+
 static void
 processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *msg, size_t *pos) {
     /* If we cannot decode these, don't respond */
@@ -320,7 +362,6 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     UA_StatusCode retval = UA_UInt32_decodeBinary(msg, pos, &secureChannelId);
     retval |= UA_UInt32_decodeBinary(msg, pos, &tokenId);
     retval |= UA_SequenceHeader_decodeBinary(msg, pos, &sequenceHeader);
-    retval |= UA_NodeId_decodeBinary(msg, pos, &requestTypeId);
     if(retval != UA_STATUSCODE_GOOD)
         return;
 
@@ -347,11 +388,68 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
         UA_SecureChannel_revolveTokens(channel);
     }
 
+    size_t final_chunked_pos = 0;
+    UA_ByteString bytes;
+    struct ChunkEntry *ch;
+    switch (msg->data[*pos - 24 + 3]) {
+    case 'C':
+        UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_SECURECHANNEL, "Chunk message");
+        ch = chunkEntryFromRequestId(channel, sequenceHeader.requestId);
+        if (! ch) {
+            ch = UA_calloc(1, sizeof(struct ChunkEntry));
+            ch->invalid_message = UA_FALSE;
+            ch->requestId = sequenceHeader.requestId;
+            UA_ByteString_init(&ch->bytes);
+            LIST_INSERT_HEAD(&channel->chunks, ch, pointers);
+        }
+
+        appendChunkedMessage(ch, msg, pos);
+        return;
+    case 'F':
+        ch = chunkEntryFromRequestId(channel, sequenceHeader.requestId);
+        if (ch) {
+            UA_LOG_TRACE(server->config.logger, UA_LOGCATEGORY_SECURECHANNEL, "Final chunk message");
+            appendChunkedMessage(ch, msg, pos);
+
+            bytes = ch->bytes;
+            LIST_REMOVE(ch, pointers);
+            UA_free(ch);
+
+            final_chunked_pos = *pos;
+            *pos = 0;
+
+            // if the chunks have failed decoding
+            // message is invalid => return early
+            if (bytes.length == 0) {
+                *pos = final_chunked_pos;
+                return;
+            }
+        } else {
+            bytes = *msg;
+        }
+        break;
+    case 'A':
+        ch = chunkEntryFromRequestId(channel, sequenceHeader.requestId);
+        if (ch) {
+            UA_ByteString_deleteMembers(&ch->bytes);
+            LIST_REMOVE(ch, pointers);
+            UA_free(ch);
+        } else {
+            UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SECURECHANNEL, "Received MSGA on an unknown request");
+        }
+
+        return;
+    }
+
+    retval |= UA_NodeId_decodeBinary(&bytes, pos, &requestTypeId);
+    if(retval != UA_STATUSCODE_GOOD)
+        return;
+
     /* Test if the service type nodeid has the right format */
     if(requestTypeId.identifierType != UA_NODEIDTYPE_NUMERIC ||
        requestTypeId.namespaceIndex != 0) {
         UA_NodeId_deleteMembers(&requestTypeId);
-        sendError(channel, msg, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSERVICEUNSUPPORTED);
+        sendError(channel, &bytes, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSERVICEUNSUPPORTED);
         return;
     }
 
@@ -370,7 +468,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
             UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
                         "Unknown request: NodeId(ns=%d, i=%d)",
                         requestTypeId.namespaceIndex, requestTypeId.identifier.numeric);
-        sendError(channel, msg, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSERVICEUNSUPPORTED);
+        sendError(channel, &bytes, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSERVICEUNSUPPORTED);
         return;
     }
 
@@ -378,7 +476,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
 #ifndef UA_ENABLE_NONSTANDARD_STATELESS
     if(channel == &anonymousChannel &&
        requestType->typeIndex > UA_TYPES_OPENSECURECHANNELREQUEST) {
-        sendError(channel, msg, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSECURECHANNELIDINVALID);
+        sendError(channel, &bytes, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSECURECHANNELIDINVALID);
         return;
     }
 #endif
@@ -386,9 +484,9 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     /* Decode the request */
     void *request = UA_alloca(requestType->memSize);
     size_t oldpos = *pos;
-    retval = UA_decodeBinary(msg, pos, request, requestType);
+    retval = UA_decodeBinary(&bytes, pos, request, requestType);
     if(retval != UA_STATUSCODE_GOOD) {
-        sendError(channel, msg, oldpos, sequenceHeader.requestId, retval);
+        sendError(channel, &bytes, oldpos, sequenceHeader.requestId, retval);
         return;
     }
 
@@ -408,7 +506,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     if(!session->activated && requestType->typeIndex != UA_TYPES_ACTIVATESESSIONREQUEST) {
         UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
                     "Client tries to call a service with a non-activated session");
-        sendError(channel, msg, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSESSIONNOTACTIVATED);
+        sendError(channel, &bytes, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSESSIONNOTACTIVATED);
         return;
     }
 #ifndef UA_ENABLE_NONSTANDARD_STATELESS
@@ -416,7 +514,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
        requestType->typeIndex > UA_TYPES_ACTIVATESESSIONREQUEST) {
         UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
                     "Client tries to call a service without a session");
-        sendError(channel, msg, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSESSIONIDINVALID);
+        sendError(channel, &bytes, *pos, sequenceHeader.requestId, UA_STATUSCODE_BADSESSIONIDINVALID);
         return;
     }
 #endif
@@ -443,10 +541,15 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
                                                 response, responseType);
     if(retval != UA_STATUSCODE_GOOD) {
         /* e.g. UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED */
-        sendError(channel, msg, oldpos, sequenceHeader.requestId, retval);
+        sendError(channel, &bytes, oldpos, sequenceHeader.requestId, retval);
     }
 
     /* Clean up */
+    if (final_chunked_pos) {
+        *pos = final_chunked_pos;
+        UA_ByteString_deleteMembers(&bytes);
+    }
+
     UA_deleteMembers(request, requestType);
     UA_deleteMembers(response, responseType);
     return;

+ 8 - 0
src/ua_securechannel.c

@@ -16,6 +16,7 @@ void UA_SecureChannel_init(UA_SecureChannel *channel) {
     channel->sequenceNumber = 0;
     channel->connection = NULL;
     LIST_INIT(&channel->sessions);
+    LIST_INIT(&channel->chunks);
 }
 
 void UA_SecureChannel_deleteMembersCleanup(UA_SecureChannel *channel) {
@@ -39,6 +40,13 @@ void UA_SecureChannel_deleteMembersCleanup(UA_SecureChannel *channel) {
         LIST_REMOVE(se, pointers);
         UA_free(se);
     }
+
+    struct ChunkEntry *ch, *temp_ch;
+    LIST_FOREACH_SAFE(ch, &channel->chunks, pointers, temp_ch) {
+        UA_ByteString_deleteMembers(&ch->bytes);
+        LIST_REMOVE(ch, pointers);
+        UA_free(ch);
+    }
 }
 
 //TODO implement real nonce generator - DUMMY function

+ 8 - 0
src/ua_securechannel.h

@@ -20,6 +20,13 @@ struct SessionEntry {
     UA_Session *session; // Just a pointer. The session is held in the session manager or the client
 };
 
+struct ChunkEntry {
+    LIST_ENTRY(ChunkEntry) pointers;
+    UA_UInt32 requestId;
+    UA_Boolean invalid_message;
+    UA_ByteString bytes;
+};
+
 struct UA_SecureChannel {
     UA_MessageSecurityMode  securityMode;
     UA_ChannelSecurityToken securityToken; // the channelId is contained in the securityToken
@@ -31,6 +38,7 @@ struct UA_SecureChannel {
     UA_UInt32      sequenceNumber;
     UA_Connection *connection;
     LIST_HEAD(session_pointerlist, SessionEntry) sessions;
+    LIST_HEAD(chunk_pointerlist, ChunkEntry) chunks;
 };
 
 void UA_SecureChannel_init(UA_SecureChannel *channel);