Kaynağa Gözat

feat(pubsub) changed PubSubConnection storage structure from ptr + size to TAILQ within the PubSubManager struct

Andreas Ebner 5 yıl önce
ebeveyn
işleme
fcefc5d0bc

+ 2 - 2
src/pubsub/ua_pubsub.h

@@ -54,7 +54,7 @@ UA_PublishedDataSet_clear(UA_Server *server, UA_PublishedDataSet *publishedDataS
 /*               Connection                   */
 /**********************************************/
 //the connection config (public part of connection) object is defined in include/ua_plugin_pubsub.h
-typedef struct{
+typedef struct UA_PubSubConnection{
     UA_PubSubConnectionConfig *config;
     //internal fields
     UA_PubSubChannel *channel;
@@ -62,7 +62,7 @@ typedef struct{
     LIST_HEAD(UA_ListOfWriterGroup, UA_WriterGroup) writerGroups;
     LIST_HEAD(UA_ListOfPubSubReaderGroup, UA_ReaderGroup) readerGroups;
     size_t readerGroupsSize;
-
+    TAILQ_ENTRY(UA_PubSubConnection) listEntry;
     UA_UInt16 configurationFreezeCounter;
 } UA_PubSubConnection;
 

+ 36 - 82
src/pubsub/ua_pubsub_manager.c

@@ -50,8 +50,7 @@ UA_Server_addPubSubConnection(UA_Server *server,
 
     /* Create new connection and add to UA_PubSubManager */
     UA_PubSubConnection *newConnectionsField = (UA_PubSubConnection *)
-        UA_realloc(server->pubSubManager.connections,
-                   sizeof(UA_PubSubConnection) * (server->pubSubManager.connectionsSize + 1));
+        UA_calloc(1, sizeof(UA_PubSubConnection));
     if(!newConnectionsField) {
         UA_PubSubConnectionConfig_clear(tmpConnectionConfig);
         UA_free(tmpConnectionConfig);
@@ -59,52 +58,37 @@ UA_Server_addPubSubConnection(UA_Server *server,
                      "PubSub Connection creation failed. Out of Memory.");
         return UA_STATUSCODE_BADOUTOFMEMORY;
     }
-    server->pubSubManager.connections = newConnectionsField;
+    if (server->pubSubManager.connectionsSize != 0)
+        TAILQ_INSERT_TAIL(&server->pubSubManager.connections, newConnectionsField, listEntry);
+    else {
+        TAILQ_INIT(&server->pubSubManager.connections);
+        TAILQ_INSERT_HEAD(&server->pubSubManager.connections, newConnectionsField, listEntry);
+    }
+
     server->pubSubManager.connectionsSize++;
 
-    UA_PubSubConnection *newConnection =
-        &server->pubSubManager.connections[server->pubSubManager.connectionsSize-1];
-
-    /* Initialize the new connection */
-    memset(newConnection, 0, sizeof(UA_PubSubConnection));
-    LIST_INIT(&newConnection->writerGroups);
-    //workaround - fixing issue with queue.h and realloc.
-    for(size_t n = 0; n < server->pubSubManager.connectionsSize; n++){
-        if(server->pubSubManager.connections[n].writerGroups.lh_first){
-            server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
-        }
-    }
-    newConnection->config = tmpConnectionConfig;
+    LIST_INIT(&newConnectionsField->writerGroups);
+    newConnectionsField->config = tmpConnectionConfig;
 
     /* Open the channel */
-    newConnection->channel = tl->createPubSubChannel(newConnection->config);
-    if(!newConnection->channel) {
-        UA_PubSubConnection_clear(server, newConnection);
+    newConnectionsField->channel = tl->createPubSubChannel(newConnectionsField->config);
+    if(!newConnectionsField->channel) {
+        UA_PubSubConnection_clear(server, newConnectionsField);
+        TAILQ_REMOVE(&server->pubSubManager.connections, newConnectionsField, listEntry);
         server->pubSubManager.connectionsSize--;
-        /* Keep the realloced (longer) array if entries remain */
-        if(server->pubSubManager.connectionsSize == 0) {
-            UA_free(server->pubSubManager.connections);
-            server->pubSubManager.connections = NULL;
-        }
+        UA_free(newConnectionsField);
         UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                      "PubSub Connection creation failed. Transport layer creation problem.");
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 
-    UA_PubSubManager_generateUniqueNodeId(server, &newConnection->identifier);
+    UA_PubSubManager_generateUniqueNodeId(server, &newConnectionsField->identifier);
 
     if(connectionIdentifier)
-        UA_NodeId_copy(&newConnection->identifier, connectionIdentifier);
+        UA_NodeId_copy(&newConnectionsField->identifier, connectionIdentifier);
 
-    /* update all writerGroups and set new PTR */
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
-        UA_WriterGroup *wg;
-        LIST_FOREACH(wg, &server->pubSubManager.connections[i].writerGroups, listEntry){
-            wg->linkedConnectionPtr = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
-        }
-    }
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
-    addPubSubConnectionRepresentation(server, newConnection);
+    addPubSubConnectionRepresentation(server, newConnectionsField);
 #endif
     return UA_STATUSCODE_GOOD;
 }
@@ -112,53 +96,18 @@ UA_Server_addPubSubConnection(UA_Server *server,
 UA_StatusCode
 UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId connection) {
     //search the identified Connection and store the Connection index
-    size_t connectionIndex;
-    UA_PubSubConnection *currentConnection = NULL;
-    for(connectionIndex = 0; connectionIndex < server->pubSubManager.connectionsSize; connectionIndex++){
-        if(UA_NodeId_equal(&connection, &server->pubSubManager.connections[connectionIndex].identifier)){
-            currentConnection = &server->pubSubManager.connections[connectionIndex];
-            break;
-        }
-    }
+    UA_PubSubConnection *currentConnection = UA_PubSubConnection_findConnectionbyId(server, connection);
     if(!currentConnection)
         return UA_STATUSCODE_BADNOTFOUND;
 
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
     removePubSubConnectionRepresentation(server, currentConnection);
 #endif
-    UA_PubSubConnection_clear(server, currentConnection);
     server->pubSubManager.connectionsSize--;
-    //remove the connection from the pubSubManager, move the last connection
-    //into the allocated memory of the deleted connection
-    if(server->pubSubManager.connectionsSize != connectionIndex){
-        memcpy(&server->pubSubManager.connections[connectionIndex],
-               &server->pubSubManager.connections[server->pubSubManager.connectionsSize],
-               sizeof(UA_PubSubConnection));
-    }
 
-    if(server->pubSubManager.connectionsSize <= 0){
-        UA_free(server->pubSubManager.connections);
-        server->pubSubManager.connections = NULL;
-    } else {
-        server->pubSubManager.connections = (UA_PubSubConnection *)
-                UA_realloc(server->pubSubManager.connections, sizeof(UA_PubSubConnection) * server->pubSubManager.connectionsSize);
-        if(!server->pubSubManager.connections){
-            return UA_STATUSCODE_BADINTERNALERROR;
-        }
-        //workaround - fixing issue with queue.h and realloc.
-        for(size_t n = 0; n < server->pubSubManager.connectionsSize; n++){
-            if(server->pubSubManager.connections[n].writerGroups.lh_first){
-                server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
-            }
-        }
-        /* update all writerGroups and set new PTR */
-        for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
-            UA_WriterGroup *wg;
-            LIST_FOREACH(wg, &server->pubSubManager.connections[i].writerGroups, listEntry){
-                wg->linkedConnectionPtr = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
-            }
-        }
-    }
+    UA_PubSubConnection_clear(server, currentConnection);
+    TAILQ_REMOVE(&server->pubSubManager.connections, currentConnection, listEntry);
+    UA_free(currentConnection);
     return UA_STATUSCODE_GOOD;
 }
 
@@ -288,9 +237,10 @@ UA_Server_removePublishedDataSet(UA_Server *server, const UA_NodeId pds) {
     }
 
     //search for referenced writers -> delete this writers. (Standard: writer must be connected with PDS)
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+    UA_PubSubConnection *tmpConnectoin;
+    TAILQ_FOREACH(tmpConnectoin, &server->pubSubManager.connections, listEntry){
         UA_WriterGroup *writerGroup;
-        LIST_FOREACH(writerGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
+        LIST_FOREACH(writerGroup, &tmpConnectoin->writerGroups, listEntry){
             UA_DataSetWriter *currentWriter, *tmpWriterGroup;
             LIST_FOREACH_SAFE(currentWriter, &writerGroup->writers, listEntry, tmpWriterGroup){
                 if(UA_NodeId_equal(&currentWriter->connectedDataSet, &publishedDataSet->identifier)){
@@ -354,11 +304,14 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
     UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub cleanup was called.");
 
     /* Stop and unfreeze all WriterGroups */
-    for(size_t i = 0; i < pubSubManager->connectionsSize; i++) {
-        UA_WriterGroup *writerGroup;
-        LIST_FOREACH(writerGroup, &pubSubManager->connections[i].writerGroups, listEntry) {
-            UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, writerGroup);
-            UA_Server_unfreezeWriterGroupConfiguration(server, writerGroup->identifier);
+    UA_PubSubConnection *tmpConnection;
+    TAILQ_FOREACH(tmpConnection, &server->pubSubManager.connections, listEntry){
+        for(size_t i = 0; i < pubSubManager->connectionsSize; i++) {
+            UA_WriterGroup *writerGroup;
+            LIST_FOREACH(writerGroup, &tmpConnection->writerGroups, listEntry) {
+                UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, writerGroup);
+                UA_Server_unfreezeWriterGroupConfiguration(server, writerGroup->identifier);
+            }
         }
     }
 
@@ -367,8 +320,9 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
     server->config.pubsubTransportLayersSize = 0;
 
     //remove Connections and WriterGroups
-    while(pubSubManager->connectionsSize > 0){
-        UA_Server_removePubSubConnection(server, pubSubManager->connections[pubSubManager->connectionsSize-1].identifier);
+    UA_PubSubConnection *tmpConnection1, *tmpConnection2;
+    TAILQ_FOREACH_SAFE(tmpConnection1, &server->pubSubManager.connections, listEntry, tmpConnection2){
+        UA_Server_removePubSubConnection(server, tmpConnection1->identifier);
     }
     while(pubSubManager->publishedDataSetsSize > 0){
         UA_Server_removePublishedDataSet(server, pubSubManager->publishedDataSets[pubSubManager->publishedDataSetsSize-1].identifier);

+ 2 - 1
src/pubsub/ua_pubsub_manager.h

@@ -20,7 +20,8 @@ typedef struct UA_PubSubManager{
     //TODO connection and pds store to linked list
     //Connections and PublishedDataSets can exist alone (own lifecycle) -> top level components
     size_t connectionsSize;
-    UA_PubSubConnection *connections;
+    TAILQ_HEAD(UA_ListOfPubSubConnection, UA_PubSubConnection) connections;
+    //TAILQ_HEAD(UA_ListOfPublishedDataSet, UA_DataSetField) publishedDataSets;
     size_t publishedDataSetsSize;
     UA_PublishedDataSet *publishedDataSets;
 } UA_PubSubManager;

+ 6 - 5
src/pubsub/ua_pubsub_reader.c

@@ -248,9 +248,10 @@ getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg,
 
 UA_ReaderGroup *
 UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
-    for(size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
+    UA_PubSubConnection *pubSubConnection;
+    TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
         UA_ReaderGroup* readerGroup = NULL;
-        LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
+        LIST_FOREACH(readerGroup, &pubSubConnection->readerGroups, listEntry) {
             if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
                 return readerGroup;
             }
@@ -261,15 +262,15 @@ UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
 }
 
 UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++) {
+    UA_PubSubConnection *pubSubConnection;
+    TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
         UA_ReaderGroup* readerGroup = NULL;
-        LIST_FOREACH(readerGroup, &server->pubSubManager.connections[i].readerGroups, listEntry) {
+        LIST_FOREACH(readerGroup, &pubSubConnection->readerGroups, listEntry) {
             UA_DataSetReader *tmpReader;
             LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
                 if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
                     return tmpReader;
                 }
-
             }
         }
     }

+ 10 - 7
src/pubsub/ua_pubsub_writer.c

@@ -89,9 +89,10 @@ UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connectio
 
 UA_PubSubConnection *
 UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
-        if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
-            return &server->pubSubManager.connections[i];
+    UA_PubSubConnection *pubSubConnection;
+    TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
+        if(UA_NodeId_equal(&connectionIdentifier, &pubSubConnection->identifier)){
+            return pubSubConnection;
         }
     }
     return NULL;
@@ -790,9 +791,10 @@ UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
 
 UA_DataSetWriter *
 UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+    UA_PubSubConnection *pubSubConnection;
+    TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
         UA_WriterGroup *tmpWriterGroup;
-        LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
+        LIST_FOREACH(tmpWriterGroup, &pubSubConnection->writerGroups, listEntry){
             UA_DataSetWriter *tmpWriter;
             LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
                 if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
@@ -993,9 +995,10 @@ UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdenti
 
 UA_WriterGroup *
 UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+    UA_PubSubConnection *tmpConnection;
+    TAILQ_FOREACH(tmpConnection, &server->pubSubManager.connections, listEntry){
         UA_WriterGroup *tmpWriterGroup;
-        LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
+        LIST_FOREACH(tmpWriterGroup, &tmpConnection->writerGroups, listEntry) {
             if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
                 return tmpWriterGroup;
             }

+ 5 - 4
src/server/ua_server.c

@@ -624,10 +624,11 @@ UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
 
 #if defined(UA_ENABLE_PUBSUB_MQTT)
     /* Listen on the pubsublayer, but only if the yield function is set */
-    for(size_t i = 0; i < server->pubSubManager.connectionsSize; ++i) {
-        UA_PubSubConnection *ps = &server->pubSubManager.connections[i];
-            if(ps && ps->channel->yield){
-                ps->channel->yield(ps->channel, timeout);
+    UA_PubSubConnection *connection;
+    TAILQ_FOREACH(connection, &server->pubSubManager.connections, listEntry){
+        UA_PubSubConnection *ps = connection;
+        if(ps && ps->channel->yield){
+            ps->channel->yield(ps->channel, timeout);
         }
     }
 #endif

+ 5 - 5
tests/pubsub/check_pubsub_connection_udp.c

@@ -45,10 +45,10 @@ START_TEST(AddConnectionsWithMinimalValidConfiguration){
     retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
     ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
-    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    ck_assert(! TAILQ_EMPTY(&server->pubSubManager.connections));
     retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
-    ck_assert(server->pubSubManager.connections[1].channel != NULL);
+    ck_assert(! TAILQ_EMPTY(&server->pubSubManager.connections));
     ck_assert_int_eq(server->pubSubManager.connectionsSize, 2);
 } END_TEST
 
@@ -65,13 +65,13 @@ START_TEST(AddRemoveAddConnectionWithMinimalValidConfiguration){
         retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
         ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
         ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
-        ck_assert(server->pubSubManager.connections[0].channel != NULL);
+        ck_assert(! TAILQ_EMPTY(&server->pubSubManager.connections));
         retVal |= UA_Server_removePubSubConnection(server, connectionIdent);
         ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
         ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
         retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
         ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
-        ck_assert(server->pubSubManager.connections[0].channel != NULL);
+        ck_assert(! TAILQ_EMPTY(&server->pubSubManager.connections));
         ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
 } END_TEST
 
@@ -142,7 +142,7 @@ START_TEST(AddSingleConnectionWithMaximalConfiguration){
     UA_StatusCode retVal = UA_Server_addPubSubConnection(server, &connectionConf, &connection);
     ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
-    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    ck_assert(! TAILQ_EMPTY(&server->pubSubManager.connections));
 } END_TEST
 
 START_TEST(GetMaximalConnectionConfigurationAndCompareValues){