瀏覽代碼

improve the client; fix remaining memleaks; wait for complete messages in the network layer

Julius Pfrommer 9 年之前
父節點
當前提交
23ed641bcc

+ 11 - 2
examples/networklayer_tcp.c

@@ -116,7 +116,7 @@ socket_recv(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeou
 #else
 		if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
 #endif
-            return UA_STATUSCODE_BADINTERNALERROR; /* retry */
+            return UA_STATUSCODE_GOOD; /* retry */
         else {
             socket_close(connection);
             return UA_STATUSCODE_BADCONNECTIONCLOSED;
@@ -357,7 +357,16 @@ ServerNetworkLayerTCP_getJobs(ServerNetworkLayerTCP *layer, UA_Job **jobs, UA_UI
             continue;
         UA_StatusCode retval = socket_recv(layer->mappings[i].connection, &buf, 0);
         if(retval == UA_STATUSCODE_GOOD) {
-            js[j] = UA_Connection_completeMessages(layer->mappings[i].connection, buf);
+            UA_Boolean realloced = UA_FALSE;
+            retval = UA_Connection_completeMessages(layer->mappings[i].connection, &buf, &realloced);
+            if(retval != UA_STATUSCODE_GOOD || buf.length == 0)
+                continue;
+            js[j].job.binaryMessage.connection = layer->mappings[i].connection;
+            js[j].job.binaryMessage.message = buf;
+            if(!realloced)
+                js[j].type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
+            else
+                js[j].type = UA_JOBTYPE_BINARYMESSAGE_ALLOCATED;
             j++;
         } else if (retval == UA_STATUSCODE_BADCONNECTIONCLOSED) {
             UA_Connection *c = layer->mappings[i].connection;

+ 21 - 5
include/ua_connection.h

@@ -97,11 +97,27 @@ void UA_EXPORT UA_Connection_deleteMembers(UA_Connection *connection);
 void UA_EXPORT UA_Connection_detachSecureChannel(UA_Connection *connection);
 void UA_EXPORT UA_Connection_attachSecureChannel(UA_Connection *connection, UA_SecureChannel *channel);
 
-/** Returns a job that contains either a message-bytestring managed by the network layer or a
-    message-bytestring that was newly allocated (or a nothing-job). Half-received messages are
-    attached to the connection. The next completion tries to create a complete message with the next
-    buffer the connection receives. */
-UA_Job UA_EXPORT UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received);
+/**
+ * The network layer may receive chopped up messages since TCP is a streaming
+ * protocol. Furthermore, the networklayer may operate on ringbuffers or
+ * statically assigned memory.
+ *
+ * If an entire message is received, it is forwarded directly. But the memory
+ * needs to be freed with the networklayer-specific mechanism. If a half message
+ * is received, we copy it into a local buffer. Then, the stack-specific free
+ * needs to be used.
+ *
+ * @param connection The connection
+ * @param message The received message. The content may be overwritten when a
+ *        previsouly received buffer is completed.
+ * @param realloced The Boolean value is set to true if the outgoing message has
+ *        been reallocated from the network layer.
+ * @return Returns UA_STATUSCODE_GOOD or an error code. When an error occurs, the ingoing message
+ *         and the current buffer in the connection are freed.
+ */
+UA_StatusCode UA_EXPORT
+UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString * UA_RESTRICT message,
+                               UA_Boolean * UA_RESTRICT realloced);
 
 #ifdef __cplusplus
 } // extern "C"

+ 47 - 14
src/client/ua_client.c

@@ -1,5 +1,6 @@
 #include "ua_util.h"
 #include "ua_client.h"
+#include "ua_client_highlevel.h"
 #include "ua_client_internal.h"
 #include "ua_types_generated.h"
 #include "ua_nodeids.h"
@@ -50,13 +51,30 @@ UA_Client * UA_Client_new(UA_ClientConfig config, UA_Logger logger) {
     return client;
 }
 
-static void UA_Client_deleteMembers(UA_Client* client){
+static void UA_Client_deleteMembers(UA_Client* client) {
     UA_Client_disconnect(client);
     UA_Connection_deleteMembers(&client->connection);
     UA_SecureChannel_deleteMembersCleanup(&client->channel);
     if(client->endpointUrl.data)
         UA_String_deleteMembers(&client->endpointUrl);
     UA_UserTokenPolicy_deleteMembers(&client->token);
+#ifdef ENABLE_SUBSCRIPTIONS
+    UA_Client_NotificationsAckNumber *n, *tmp;
+    LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
+        LIST_REMOVE(n, listEntry);
+        free(n);
+    }
+    UA_Client_Subscription *sub, *tmps;
+    LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps) {
+        LIST_REMOVE(sub, listEntry);
+        UA_Client_MonitoredItem *mon, *tmpmon;
+        LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmpmon) {
+            UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
+                                                        mon->MonitoredItemId);
+        }
+        free(sub);
+    }
+#endif
 }
 
 void UA_Client_reset(UA_Client* client){
@@ -108,32 +126,38 @@ static UA_StatusCode HelAckHandshake(UA_Client *c) {
     message.length = messageHeader.messageSize;
     retval = c->connection.send(&c->connection, &message);
     if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_DEBUG(c->logger, UA_LOGCATEGORY_NETWORK, "Sending HEL failed");
+        UA_LOG_INFO(c->logger, UA_LOGCATEGORY_NETWORK, "Sending HEL failed");
         return retval;
     }
     UA_LOG_DEBUG(c->logger, UA_LOGCATEGORY_NETWORK, "Sent HEL message");
 
     UA_ByteString reply;
     UA_ByteString_init(&reply);
+    UA_Boolean realloced = UA_FALSE;
     do {
         retval = c->connection.recv(&c->connection, &reply, c->config.timeout);
+        retval |= UA_Connection_completeMessages(&c->connection, &reply, &realloced);
         if(retval != UA_STATUSCODE_GOOD) {
-            UA_LOG_DEBUG(c->logger, UA_LOGCATEGORY_NETWORK, "Receiving ACK message failed");
+            UA_LOG_INFO(c->logger, UA_LOGCATEGORY_NETWORK, "Receiving ACK message failed");
             return retval;
         }
-    } while(!reply.data);
+    } while(reply.length == 0);
 
     offset = 0;
     UA_TcpMessageHeader_decodeBinary(&reply, &offset, &messageHeader);
     UA_TcpAcknowledgeMessage ackMessage;
     retval = UA_TcpAcknowledgeMessage_decodeBinary(&reply, &offset, &ackMessage);
-    UA_ByteString_deleteMembers(&reply);
+    if(!realloced)
+        c->connection.releaseRecvBuffer(&c->connection, &reply);
+    else
+        UA_ByteString_deleteMembers(&reply);
+
     if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_DEBUG(c->logger, UA_LOGCATEGORY_NETWORK, "Decoding ACK message failed");
+        UA_LOG_INFO(c->logger, UA_LOGCATEGORY_NETWORK, "Decoding ACK message failed");
         return retval;
     }
-
     UA_LOG_DEBUG(c->logger, UA_LOGCATEGORY_NETWORK, "Received ACK message");
+
     conn->remoteConf.maxChunkCount = ackMessage.maxChunkCount;
     conn->remoteConf.maxMessageSize = ackMessage.maxMessageSize;
     conn->remoteConf.protocolVersion = ackMessage.protocolVersion;
@@ -214,14 +238,16 @@ static UA_StatusCode SecureChannelHandshake(UA_Client *client, UA_Boolean renew)
 
     UA_ByteString reply;
     UA_ByteString_init(&reply);
+    UA_Boolean realloced = UA_FALSE;
     do {
-        retval = client->connection.recv(&client->connection, &reply, client->config.timeout);
+        retval = c->recv(c, &reply, client->config.timeout);
+        retval |= UA_Connection_completeMessages(c, &reply, &realloced);
         if(retval != UA_STATUSCODE_GOOD) {
             UA_LOG_DEBUG(client->logger, UA_LOGCATEGORY_SECURECHANNEL,
                          "Receiving OpenSecureChannelResponse failed");
             return retval;
         }
-    } while(!reply.data);
+    } while(reply.length == 0);
 
     offset = 0;
     UA_SecureConversationMessageHeader_decodeBinary(&reply, &offset, &messageHeader);
@@ -242,10 +268,14 @@ static UA_StatusCode SecureChannelHandshake(UA_Client *client, UA_Boolean renew)
     UA_OpenSecureChannelResponse response;
     UA_OpenSecureChannelResponse_init(&response);
     retval = UA_OpenSecureChannelResponse_decodeBinary(&reply, &offset, &response);
+    if(!realloced)
+        c->releaseRecvBuffer(c, &reply);
+    else
+        UA_ByteString_deleteMembers(&reply);
+        
     if(retval != UA_STATUSCODE_GOOD) {
         UA_LOG_DEBUG(client->logger, UA_LOGCATEGORY_SECURECHANNEL,
                      "Decoding OpenSecureChannelResponse failed");
-        UA_ByteString_deleteMembers(&reply);
         UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
         UA_OpenSecureChannelResponse_init(&response);
         response.responseHeader.serviceResult = retval;
@@ -253,7 +283,6 @@ static UA_StatusCode SecureChannelHandshake(UA_Client *client, UA_Boolean renew)
     }
 
     client->scExpiresAt = UA_DateTime_now() + response.securityToken.revisedLifetime * 10000;
-    UA_ByteString_deleteMembers(&reply);
     retval = response.responseHeader.serviceResult;
 
     if(retval != UA_STATUSCODE_GOOD)
@@ -546,8 +575,10 @@ void __UA_Client_Service(UA_Client *client, const void *r, const UA_DataType *re
     // Todo: push this into the generic securechannel implementation for client and server
     UA_ByteString reply;
     UA_ByteString_init(&reply);
+    UA_Boolean realloced = UA_FALSE;
     do {
         retval = client->connection.recv(&client->connection, &reply, client->config.timeout);
+        retval |= UA_Connection_completeMessages(&client->connection, &reply, &realloced);
         if(retval != UA_STATUSCODE_GOOD) {
             respHeader->serviceResult = retval;
             client->state = UA_CLIENTSTATE_ERRORED;
@@ -567,9 +598,8 @@ void __UA_Client_Service(UA_Client *client, const void *r, const UA_DataType *re
     UA_NodeId expectedNodeId = UA_NODEID_NUMERIC(0, responseType->typeId.identifier.numeric +
                                                  UA_ENCODINGOFFSET_BINARY);
 
-    if(retval != UA_STATUSCODE_GOOD) {
+    if(retval != UA_STATUSCODE_GOOD)
         goto finish;
-    }
 
     /* Todo: we need to demux responses since a publish responses may come at any time */
     if(!UA_NodeId_equal(&responseId, &expectedNodeId) || seqHeader.requestId != requestId) {
@@ -590,7 +620,10 @@ void __UA_Client_Service(UA_Client *client, const void *r, const UA_DataType *re
 
  finish:
     UA_SymmetricAlgorithmSecurityHeader_deleteMembers(&symHeader);
-    UA_ByteString_deleteMembers(&reply);
+    if(!realloced)
+        client->connection.releaseRecvBuffer(&client->connection, &reply);
+    else
+        UA_ByteString_deleteMembers(&reply);
     if(retval != UA_STATUSCODE_GOOD){
         UA_LOG_INFO(client->logger, UA_LOGCATEGORY_CLIENT, "Error receiving the response");
         client->state = UA_CLIENTSTATE_ERRORED;

+ 4 - 2
src/client/ua_client_highlevel.c

@@ -164,11 +164,13 @@ __UA_Client_addNode(UA_Client *client, const UA_NodeClass nodeClass, const UA_No
         UA_AddNodesResponse_deleteMembers(&response);
         return UA_STATUSCODE_BADUNEXPECTEDERROR;
     }
-    if(outNewNodeId && response.results[0].statusCode) {
+    if(outNewNodeId && response.results[0].statusCode == UA_STATUSCODE_GOOD) {
         *outNewNodeId = response.results[0].addedNodeId;
         UA_NodeId_init(&response.results[0].addedNodeId);
     }
-    return response.results[0].statusCode;
+    retval = response.results[0].statusCode;
+    UA_AddNodesResponse_deleteMembers(&response);
+    return retval;
 }
 
 /********/

+ 2 - 2
src/client/ua_client_highlevel_subscriptions.c

@@ -57,7 +57,7 @@ UA_StatusCode UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscr
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     
-    UA_DeleteSubscriptionsRequest  request;
+    UA_DeleteSubscriptionsRequest request;
     UA_DeleteSubscriptionsRequest_init(&request);
     request.subscriptionIdsSize = 1;
     request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
@@ -68,7 +68,7 @@ UA_StatusCode UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscr
         retval |= UA_Client_Subscriptions_removeMonitoredItem(client, sub->SubscriptionID,
                                                               mon->MonitoredItemId);
     }
-    if(retval != UA_STATUSCODE_GOOD){
+    if(retval != UA_STATUSCODE_GOOD) {
 	    UA_DeleteSubscriptionsRequest_deleteMembers(&request);
         return retval;
     }

+ 4 - 5
src/server/ua_server_binary.c

@@ -159,13 +159,12 @@ static void invoke_service(UA_Server *server, UA_SecureChannel *channel, UA_UInt
     if(request->authenticationToken.namespaceIndex == 0
             && request->authenticationToken.identifierType == UA_NODEIDTYPE_NUMERIC
             && request->authenticationToken.identifier.numeric == 0
-    && (responseType->typeIndex == UA_TYPES_READRESPONSE
-            || responseType->typeIndex == UA_TYPES_WRITERESPONSE
-            || responseType->typeIndex == UA_TYPES_BROWSERESPONSE)
-    ){
+       && (responseType->typeIndex == UA_TYPES_READRESPONSE
+           || responseType->typeIndex == UA_TYPES_WRITERESPONSE
+           || responseType->typeIndex == UA_TYPES_BROWSERESPONSE)) {
         session = &anonymousSession;
         service(server, session, request, response);
-    }else{
+    } else {
 #endif
     if(!session || session->channel != channel) {
         response->serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;

+ 64 - 53
src/ua_connection.c

@@ -29,93 +29,104 @@ void UA_Connection_deleteMembers(UA_Connection *connection) {
     UA_ByteString_deleteMembers(&connection->incompleteMessage);
 }
 
-UA_Job UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received) {
-    UA_Job job = (UA_Job){.type = UA_JOBTYPE_NOTHING};
-    UA_ByteString current;
-    if(connection->incompleteMessage.length <= 0)
-        current = received;
-    else {
+UA_StatusCode
+UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString * UA_RESTRICT message,
+                              UA_Boolean * UA_RESTRICT realloced) {
+    UA_ByteString *current = message;
+    *realloced = UA_FALSE;
+    if(connection->incompleteMessage.length > 0) {
         /* concat the existing incomplete message with the new message */
-        current.data = UA_realloc(connection->incompleteMessage.data,
-                                  connection->incompleteMessage.length + received.length);
-        if(!current.data) {
+        UA_Byte *data = UA_realloc(connection->incompleteMessage.data,
+                                   connection->incompleteMessage.length + message->length);
+        if(!data) {
             /* not enough memory */
             UA_ByteString_deleteMembers(&connection->incompleteMessage);
-            connection->releaseRecvBuffer(connection, &received);
-            return job;
+            connection->releaseRecvBuffer(connection, message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
         }
-        memcpy(current.data + connection->incompleteMessage.length, received.data, received.length);
-        current.length = connection->incompleteMessage.length + received.length;
-        connection->releaseRecvBuffer(connection, &received);
-        UA_ByteString_init(&connection->incompleteMessage);
+        memcpy(&data[connection->incompleteMessage.length], message->data, message->length);
+        connection->incompleteMessage.data = data;
+        connection->incompleteMessage.length += message->length;
+        connection->releaseRecvBuffer(connection, message);
+        current = &connection->incompleteMessage;
+        *realloced = UA_TRUE;
     }
 
     /* the while loop sets pos to the first element after the last complete message. if a message
        contains garbage, the buffer length is set to contain only the "good" messages before. */
     size_t pos = 0;
-    while(current.length - pos >= 16) {
-        UA_UInt32 msgtype = current.data[pos] + (current.data[pos+1] << 8) + (current.data[pos+2] << 16);
+    size_t delete_at = current->length-1; // garbled message after this point
+    while(current->length - pos >= 16) {
+        UA_UInt32 msgtype = current->data[pos] + (current->data[pos+1] << 8) + (current->data[pos+2] << 16);
         if(msgtype != ('M' + ('S' << 8) + ('G' << 16)) &&
            msgtype != ('O' + ('P' << 8) + ('N' << 16)) &&
            msgtype != ('H' + ('E' << 8) + ('L' << 16)) &&
            msgtype != ('A' + ('C' << 8) + ('K' << 16)) &&
            msgtype != ('C' + ('L' << 8) + ('O' << 16))) {
-            /* the message type is not recognized. throw the remaining bytestring away */
-            current.length = pos;
+            /* the message type is not recognized */
+            delete_at = pos; // throw the remaining message away
             break;
         }
         UA_Int32 length = 0;
         size_t length_pos = pos + 4;
-        UA_StatusCode retval = UA_Int32_decodeBinary(&current, &length_pos, &length);
+        UA_StatusCode retval = UA_Int32_decodeBinary(current, &length_pos, &length);
         if(retval != UA_STATUSCODE_GOOD || length < 16 || length > (UA_Int32)connection->localConf.maxMessageSize) {
             /* the message size is not allowed. throw the remaining bytestring away */
-            current.length = pos;
+            delete_at = pos;
             break;
         }
-        if(length + pos > current.length)
+        if(length + pos > current->length)
             break; /* the message is incomplete. keep the beginning */
         pos += length;
     }
 
-    if(current.length == 0) {
-        /* throw everything away */
-        if(current.data == received.data)
-            connection->releaseRecvBuffer(connection, &received);
-        else
-            UA_ByteString_deleteMembers(&current);
-        return job;
+    /* throw the message away */
+    if(delete_at == 0) {
+        if(!*realloced) {
+            connection->releaseRecvBuffer(connection, message);
+            *realloced = UA_TRUE;
+        } else
+            UA_ByteString_deleteMembers(current);
+        return UA_STATUSCODE_GOOD;
     }
 
+    /* no complete message at all */
     if(pos == 0) {
-        /* no complete message in current */
-        if(current.data == received.data) {
-            /* copy the data into the connection */
-            UA_ByteString_copy(&current, &connection->incompleteMessage);
-            connection->releaseRecvBuffer(connection, &received);
-        } else {
-            /* the data is already copied off the network stack */
-            connection->incompleteMessage = current;
-        }
-        return job;
+        if(!*realloced) {
+            /* store the buffer in the connection */
+            UA_ByteString_copy(current, &connection->incompleteMessage);
+            connection->releaseRecvBuffer(connection, message);
+            *realloced = UA_TRUE;
+        } 
+        return UA_STATUSCODE_GOOD;
     }
 
-    if(current.length != pos) {
-        /* there is an incomplete message at the end of current */
-        connection->incompleteMessage.data = UA_malloc(current.length - pos);
-        if(connection->incompleteMessage.data) {
-            memcpy(connection->incompleteMessage.data, &current.data[pos], current.length - pos);
-            connection->incompleteMessage.length = current.length - pos;
+    /* there remains an incomplete message at the end */
+    if(current->length != pos) {
+        UA_Byte *data = UA_malloc(current->length - pos);
+        if(!data) {
+            UA_ByteString_deleteMembers(&connection->incompleteMessage);
+            if(!*realloced) {
+                connection->releaseRecvBuffer(connection, message);
+                *realloced = UA_TRUE;
+            }
+            return UA_STATUSCODE_BADOUTOFMEMORY;
         }
-        current.length = pos;
+        size_t newlength = current->length - pos;
+        memcpy(data, &current->data[pos], newlength);
+        current->length = pos;
+        if(*realloced)
+            *message = *current;
+        connection->incompleteMessage.data = data;
+        connection->incompleteMessage.length = newlength;
+        return UA_STATUSCODE_GOOD;
     }
 
-    job.job.binaryMessage.message = current;
-    job.job.binaryMessage.connection = connection;
-    if(current.data == received.data)
-        job.type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
-    else
-        job.type = UA_JOBTYPE_BINARYMESSAGE_ALLOCATED;
-    return job;
+    if(current == &connection->incompleteMessage) {
+        *message = *current;
+        connection->incompleteMessage = UA_BYTESTRING_NULL;
+    }
+    return UA_STATUSCODE_GOOD;
 }
 
 void UA_Connection_detachSecureChannel(UA_Connection *connection) {