浏览代码

store incompletely received messages in the connection

Julius Pfrommer 10 年之前
父节点
当前提交
2f1531bae6
共有 5 个文件被更改,包括 92 次插入65 次删除
  1. 6 52
      examples/networklayer_tcp.c
  2. 5 0
      include/ua_connection.h
  3. 14 9
      src/client/ua_client.c
  4. 65 0
      src/ua_securechannel.c
  5. 2 4
      src/ua_types.c

+ 6 - 52
examples/networklayer_tcp.c

@@ -32,20 +32,10 @@
 
 #include "networklayer_tcp.h" // UA_MULTITHREADING is defined in here
 
-#ifdef NOT_AMALGATED
-	#include "ua_types.h" //TODO: this is a hack - refactor
-	#include "ua_transport_generated.h" //TODO: this is a hack - refactor
-	#include "ua_types_encoding_binary.h" //TODO: this is a hack - refactor
-#else
-    #include "open62541.h"
-#endif
-
 #ifdef UA_MULTITHREADING
 #include <urcu/uatomic.h>
 #endif
 
-
-
 struct ServerNetworklayer_TCP;
 
 /* Forwarded to the server as a (UA_Connection) and used for callbacks back into
@@ -576,50 +566,14 @@ static UA_StatusCode ClientNetworkLayerTCP_send(ClientNetworkLayerTCP *handle, U
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_StatusCode ClientNetworkLayerTCP_awaitResponse(ClientNetworkLayerTCP *handle, UA_ByteString *response,
-                                                         UA_UInt32 timeout) {
-    //FD_ZERO(&handle->read_fds);
-    //FD_SET(handle->sockfd, &handle->read_fds);//tcp socket
+static UA_StatusCode ClientNetworkLayerTCP_awaitResponse(ClientNetworkLayerTCP *handle,
+                                                         UA_ByteString *response, UA_UInt32 timeout) {
     struct timeval tmptv = {0, timeout};
-    /*int ret = select(handle->sockfd+1, &handle->read_fds, NULL, NULL, &tmptv);
-    if(ret <= -1)
-        return UA_STATUSCODE_BADINTERNALERROR;
-    if(ret == 0)
-        return UA_STATUSCODE_BADTIMEOUT;*/
-
     setsockopt(handle->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv,sizeof(struct timeval));
-
-    int ret = 0;
-
-    unsigned int already_received = 0;
-    UA_SecureConversationMessageHeader msgHeader;
-
-    do{
-    	if(already_received>0 || ret <= -1)Sleep(1); //1ms
-
-    	ret = recv(handle->sockfd, (char*)(response->data+already_received), response->length-already_received, 0);
-
-    	if(ret <= -1){
-    		continue;
-    	}
-    	if(ret == 0)
-    		return UA_STATUSCODE_BADSERVERNOTCONNECTED;
-
-    	//FIXME: receive even more
-    	if(ret <= 4){
-    		return UA_STATUSCODE_BADINTERNALERROR;
-    	}
-
-    	already_received+=ret;
-
-    	size_t offset = 0;
-    	//let us try to decode the length of the real message
-    	UA_SecureConversationMessageHeader_decodeBinary(response, &offset, &msgHeader);
-    	//printf("ret %d, length %d, already recv %d\n", ret, msgHeader.messageHeader.messageSize, already_received);
-    }while(msgHeader.messageHeader.messageSize == 0 || already_received < msgHeader.messageHeader.messageSize);
-
-    response->length = already_received;
-
+    int ret = recv(handle->sockfd, (char*)response->data, response->length, 0);
+    if(ret == 0)
+        return UA_STATUSCODE_BADSERVERNOTCONNECTED;
+    response->length = ret;
     return UA_STATUSCODE_GOOD;
 }
 

+ 5 - 0
include/ua_connection.h

@@ -62,11 +62,16 @@ typedef struct UA_Connection {
     UA_SecureChannel   *channel;
     void (*write)(void *connection, UA_ByteStringArray buf);
     void (*close)(void *connection);
+    UA_ByteString incompleteMessage;
 } UA_Connection;
 
 void UA_EXPORT UA_Connection_detachSecureChannel(UA_Connection *connection);
 // void UA_Connection_attachSecureChannel(UA_Connection *connection);
 
+/** Returns a string of complete message (the length entry is decoded for that).
+    If the received message is incomplete, it is retained in the connection. */
+UA_ByteString UA_EXPORT UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received);
+
 /** @} */
 
 #ifdef __cplusplus

+ 14 - 9
src/client/ua_client.c

@@ -5,6 +5,7 @@
 #include "ua_transport_generated.h"
 
 struct UA_Client {
+    /* Connection */
     UA_ClientNetworkLayer networkLayer;
     UA_String endpointUrl;
     UA_Connection connection;
@@ -30,6 +31,7 @@ UA_Client * UA_Client_new(void) {
         return UA_NULL;
     UA_String_init(&client->endpointUrl);
     client->connection.state = UA_CONNECTION_OPENING;
+    UA_ByteString_init(&client->connection.incompleteMessage);
 
     client->sequenceNumber = 0;
     client->requestId = 0;
@@ -70,8 +72,8 @@ static UA_StatusCode HelAckHandshake(UA_Client *c) {
 	hello.receiveBufferSize = conn->localConf.recvBufferSize;
 	hello.sendBufferSize = conn->localConf.sendBufferSize;
 
-	messageHeader.messageSize = UA_TcpHelloMessage_calcSizeBinary((UA_TcpHelloMessage const*) &hello) +
-                                UA_TcpMessageHeader_calcSizeBinary((UA_TcpMessageHeader const*) &messageHeader);
+	messageHeader.messageSize = UA_TcpHelloMessage_calcSizeBinary((UA_TcpHelloMessage const*)&hello) +
+                                UA_TcpMessageHeader_calcSizeBinary((UA_TcpMessageHeader const*)&messageHeader);
 	UA_ByteString message;
     message.data = UA_alloca(messageHeader.messageSize);
     message.length = messageHeader.messageSize;
@@ -273,13 +275,16 @@ static void sendReceiveRequest(UA_RequestHeader *request, const UA_DataType *req
 
     /* Response */
     UA_ByteString reply;
-    UA_ByteString_newMembers(&reply, client->connection.localConf.recvBufferSize);
-    retval = client->networkLayer.awaitResponse(client->networkLayer.nlHandle, &reply, 1000);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_ByteString_deleteMembers(&reply);
-        respHeader->serviceResult = retval;
-        return;
-    }
+    do {
+        UA_ByteString_newMembers(&reply, client->connection.localConf.recvBufferSize);
+        retval = client->networkLayer.awaitResponse(client->networkLayer.nlHandle, &reply, 1000);
+        if(retval != UA_STATUSCODE_GOOD) {
+            UA_ByteString_deleteMembers(&reply);
+            respHeader->serviceResult = retval;
+            return;
+        }
+        reply = UA_Connection_completeMessages(&client->connection, reply);
+    } while(reply.length < 0);
 
 	offset = 0;
 	retval |= UA_SecureConversationMessageHeader_decodeBinary(&reply, &offset, &msgHeader);

+ 65 - 0
src/ua_securechannel.c

@@ -1,12 +1,77 @@
 #include "ua_util.h"
 #include "ua_securechannel.h"
 #include "ua_statuscodes.h"
+#include "ua_types_encoding_binary.h"
 
 // max message size is 64k
 const UA_ConnectionConfig UA_ConnectionConfig_standard =
     {.protocolVersion = 0, .sendBufferSize = 65536, .recvBufferSize  = 65536,
      .maxMessageSize = 65536, .maxChunkCount   = 1};
 
+UA_ByteString UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received)
+{
+    /* concat received to the incomplete message we have */
+    UA_ByteString current;
+    if(connection->incompleteMessage.length < 0)
+        current = received;
+    else {
+        UA_Byte *longer = UA_realloc(connection->incompleteMessage.data,
+                                     connection->incompleteMessage.length + received.length);
+        if(!longer) {
+            UA_ByteString_deleteMembers(&received);
+            UA_ByteString_deleteMembers(&connection->incompleteMessage);
+            connection->incompleteMessage.length = -1;
+            received.length = -1;
+            return received;
+        }
+        UA_memcpy(&longer[connection->incompleteMessage.length], received.data, received.length);
+        current.data = longer;
+        current.length = connection->incompleteMessage.length + received.length;
+    }
+
+    /* find the first non-complete message */
+    size_t end_pos = 0;
+    while(current.length - end_pos >= 32) {
+        if(!(current.data[0] == 'M' && current.data[1] == 'S' && current.data[2] == 'G') &&
+           !(current.data[0] == 'O' && current.data[1] == 'P' && current.data[2] == 'N') &&
+           !(current.data[0] == 'H' && current.data[1] == 'E' && current.data[2] == 'L') &&
+           !(current.data[0] == 'A' && current.data[1] == 'C' && current.data[2] == 'K') &&
+           !(current.data[0] == 'C' && current.data[1] == 'L' && current.data[2] == 'O')) {
+            current.length = end_pos; // throw the remaining bytestring away
+            break;
+        }
+        UA_Int32 length;
+        size_t pos = end_pos + 4;
+        UA_Int32_decodeBinary(&received, &pos, &length);
+        if(length < 32 || length > (UA_Int32)connection->localConf.maxMessageSize) {
+            current.length = end_pos; // throw the remaining bytestring away
+            break;
+        }
+        if(length + (UA_Int32)end_pos > current.length)
+            break; // the message is not complete
+        end_pos += length;
+    }
+
+    /* return all complete messages, retain the (last) incomplete one */
+    if(current.length == 0) {
+        UA_String_deleteMembers(&current);
+        current.length = -1;
+    } else {
+        if(current.length - end_pos > 0) {
+            connection->incompleteMessage.data = UA_malloc(current.length - end_pos);
+            if(!connection->incompleteMessage.data)
+                UA_ByteString_init(&connection->incompleteMessage);
+            else {
+                UA_memcpy(&current.data[end_pos], connection->incompleteMessage.data,
+                          current.length - end_pos);
+                connection->incompleteMessage.length = current.length - end_pos;
+            }
+        }
+        current.length = end_pos;
+    }
+    return current;
+}
+
 void UA_SecureChannel_init(UA_SecureChannel *channel) {
     UA_MessageSecurityMode_init(&channel->securityMode);
     UA_ChannelSecurityToken_init(&channel->securityToken);

+ 2 - 4
src/ua_types.c

@@ -88,10 +88,8 @@ void UA_String_init(UA_String *p) {
 
 UA_TYPE_DELETE_DEFAULT(UA_String)
 void UA_String_deleteMembers(UA_String *p) {
-	if(p->data) {
-		UA_free(p->data);
-        p->data = UA_NULL;
-    }
+    UA_free(p->data);
+    p->data = UA_NULL;
 }
 
 UA_StatusCode UA_String_copy(UA_String const *src, UA_String *dst) {