Browse Source

nodestore_replace checks if the old node is already obsolete

Julius Pfrommer 10 years ago
parent
commit
28281191df

+ 24 - 19
src/server/ua_nodestore.c

@@ -50,7 +50,7 @@ static INLINE UA_Int16 higher_prime_index(hash_t n) {
 
 /* Returns UA_TRUE if an entry was found under the nodeid. Otherwise, returns
    false and sets slot to a pointer to the next free slot. */
-static UA_Boolean __containsNodeId(const UA_NodeStore *ns, const UA_NodeId *nodeid, struct nodeEntry ***entry) {
+static UA_Boolean containsNodeId(const UA_NodeStore *ns, const UA_NodeId *nodeid, struct nodeEntry ***entry) {
     hash_t         h     = hash(nodeid);
     UA_UInt32      size  = ns->size;
     hash_t         index = mod(h, size);
@@ -92,7 +92,7 @@ static UA_Boolean __containsNodeId(const UA_NodeStore *ns, const UA_NodeId *node
 /* The following function changes size of memory allocated for the entries and
    repeatedly inserts the table elements. The occupancy of the table after the
    call will be about 50%. */
-static UA_StatusCode __expand(UA_NodeStore *ns) {
+static UA_StatusCode expand(UA_NodeStore *ns) {
     UA_Int32 osize = ns->size;
     UA_Int32 count = ns->count;
     /* Resize only when table after removal of unused elements is either too full or too empty.  */
@@ -117,7 +117,7 @@ static UA_StatusCode __expand(UA_NodeStore *ns) {
         if(!oentries[i])
             continue;
         struct nodeEntry **e;
-        __containsNodeId(ns, &(*oentries[i]).node.nodeId, &e);  /* We know this returns an empty entry here */
+        containsNodeId(ns, &(*oentries[i]).node.nodeId, &e);  /* We know this returns an empty entry here */
         *e = oentries[i];
         j++;
     }
@@ -127,7 +127,7 @@ static UA_StatusCode __expand(UA_NodeStore *ns) {
 }
 
 /* Marks the entry dead and deletes if necessary. */
-static void __deleteEntry(struct nodeEntry *entry) {
+static void deleteEntry(struct nodeEntry *entry) {
     if(entry->refcount > 0)
         return;
     const UA_Node *node = &entry->node;
@@ -163,7 +163,7 @@ static void __deleteEntry(struct nodeEntry *entry) {
     UA_free(entry);
 }
 
-static INLINE struct nodeEntry * __nodeEntryFromNode(const UA_Node *node) {
+static INLINE struct nodeEntry * nodeEntryFromNode(const UA_Node *node) {
     UA_UInt32 nodesize = 0;
     /* Copy the node into the entry. Then reset the original node. It shall no longer be used. */
     switch(node->nodeClass) {
@@ -229,7 +229,7 @@ void UA_NodeStore_delete(UA_NodeStore *ns) {
     for(UA_UInt32 i = 0;i < size;i++) {
         if(entries[i] != UA_NULL) {
             entries[i]->refcount &= ~ALIVE_BIT; // mark dead
-            __deleteEntry(entries[i]);
+            deleteEntry(entries[i]);
             entries[i] = UA_NULL;
             ns->count--;
         }
@@ -241,7 +241,7 @@ void UA_NodeStore_delete(UA_NodeStore *ns) {
 
 UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boolean getManaged) {
     if(ns->size * 3 <= ns->count * 4) {
-        if(__expand(ns) != UA_STATUSCODE_GOOD)
+        if(expand(ns) != UA_STATUSCODE_GOOD)
             return UA_STATUSCODE_BADINTERNALERROR;
     }
 
@@ -257,18 +257,18 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boo
         hash_t increase = mod2(identifier, size);
         while(UA_TRUE) {
             nodeId->identifier.numeric = identifier;
-            if(!__containsNodeId(ns, nodeId, &slot))
+            if(!containsNodeId(ns, nodeId, &slot))
                 break;
             identifier += increase;
             if(identifier >= size)
                 identifier -= size;
         }
     } else {
-        if(__containsNodeId(ns, nodeId, &slot))
+        if(containsNodeId(ns, nodeId, &slot))
             return UA_STATUSCODE_BADNODEIDEXISTS;
     }
     
-    struct nodeEntry *entry = __nodeEntryFromNode(*node);
+    struct nodeEntry *entry = nodeEntryFromNode(*node);
     if(!entry)
         return UA_STATUSCODE_BADOUTOFMEMORY;
 
@@ -286,18 +286,23 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boo
     return UA_STATUSCODE_GOOD;
 }
 
-UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Boolean getManaged) {
+UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node *oldNode,
+                                   const UA_Node **node, UA_Boolean getManaged) {
     struct nodeEntry **slot;
     const UA_NodeId *nodeId = &(*node)->nodeId;
-    if(!__containsNodeId(ns, nodeId, &slot))
+    if(!containsNodeId(ns, nodeId, &slot))
         return UA_STATUSCODE_BADNODEIDUNKNOWN;
 
-    struct nodeEntry *entry = __nodeEntryFromNode(*node);
+    // you try to replace an obsolete node (without threading this can't happen
+    // if the user doesn't do it deliberately in his code)
+    if(&(*slot)->node != oldNode)
+        return UA_STATUSCODE_BADINTERNALERROR;
+
+    struct nodeEntry *entry = nodeEntryFromNode(*node);
     if(!entry)
         return UA_STATUSCODE_BADOUTOFMEMORY;
 
     (*slot)->refcount &= ~ALIVE_BIT; // mark dead
-    __deleteEntry(*slot);
     *slot = entry;
 
     if(getManaged) {
@@ -312,7 +317,7 @@ UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Bo
 
 const UA_Node * UA_NodeStore_get(const UA_NodeStore *ns, const UA_NodeId *nodeid) {
     struct nodeEntry **slot;
-    if(!__containsNodeId(ns, nodeid, &slot))
+    if(!containsNodeId(ns, nodeid, &slot))
         return UA_NULL;
     (*slot)->refcount++;
     return &(*slot)->node;
@@ -320,18 +325,18 @@ const UA_Node * UA_NodeStore_get(const UA_NodeStore *ns, const UA_NodeId *nodeid
 
 UA_StatusCode UA_NodeStore_remove(UA_NodeStore *ns, const UA_NodeId *nodeid) {
     struct nodeEntry **slot;
-    if(!__containsNodeId(ns, nodeid, &slot))
+    if(!containsNodeId(ns, nodeid, &slot))
         return UA_STATUSCODE_BADNODEIDUNKNOWN;
 
     // Check before if deleting the node makes the UA_NodeStore inconsistent.
     (*slot)->refcount &= ~ALIVE_BIT; // mark dead
-    __deleteEntry(*slot);
+    deleteEntry(*slot);
     *slot = UA_NULL;
     ns->count--;
 
     /* Downsize the hashmap if it is very empty */
     if(ns->count * 8 < ns->size && ns->size > 32)
-        __expand(ns); // this can fail. we just continue with the bigger hashmap.
+        expand(ns); // this can fail. we just continue with the bigger hashmap.
 
     return UA_STATUSCODE_GOOD;
 }
@@ -346,5 +351,5 @@ void UA_NodeStore_iterate(const UA_NodeStore *ns, UA_NodeStore_nodeVisitor visit
 void UA_NodeStore_release(const UA_Node *managed) {
     struct nodeEntry *entry = (struct nodeEntry *) ((char*)managed - offsetof(struct nodeEntry, node));
     entry->refcount--;
-    __deleteEntry(entry);    
+    deleteEntry(entry);
 }

+ 6 - 2
src/server/ua_nodestore.h

@@ -44,8 +44,12 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boo
 /** @brief Replace an existing node in the nodestore
 
     With the getManaged flag, the node pointer is replaced with the managed
-    pointer. Otherwise, it is set to UA_NULL. */
-UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Boolean getManaged);
+    pointer. Otherwise, it is set to UA_NULL.
+
+    If the return value is UA_STATUSCODE_BADINTERNALERROR, try again. Presumably
+    the oldNode was already replaced by another thread.
+*/
+UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node *oldNode, const UA_Node **node, UA_Boolean getManaged);
 
 /** @brief Remove a node from the namespace. Always succeeds, even if the node
     was not found. */

+ 42 - 35
src/server/ua_nodestore_concurrent.c

@@ -8,12 +8,12 @@
 
 #define ALIVE_BIT (1 << 15) /* Alive bit in the refcount */
 
-typedef struct UA_NodeStore_Entry {
+struct nodeEntry {
     struct cds_lfht_node htn;      /* contains next-ptr for urcu-hashmap */
     struct rcu_head      rcu_head; /* For call-rcu */
     UA_UInt16 refcount;            /* Counts the amount of readers on it [alive-bit, 15 counter-bits] */
     const UA_Node node;            /* Might be cast from any _bigger_ UA_Node* type. Allocate enough memory! */
-} UA_NodeStore_Entry;
+};
 
 struct UA_NodeStore {
     struct cds_lfht *ht; /* Hash table */
@@ -26,35 +26,27 @@ static inline void node_deleteMembers(const UA_Node *node) {
     case UA_NODECLASS_OBJECT:
         UA_ObjectNode_deleteMembers((UA_ObjectNode *)node);
         break;
-
     case UA_NODECLASS_VARIABLE:
         UA_VariableNode_deleteMembers((UA_VariableNode *)node);
         break;
-
     case UA_NODECLASS_METHOD:
         UA_MethodNode_deleteMembers((UA_MethodNode *)node);
         break;
-
     case UA_NODECLASS_OBJECTTYPE:
         UA_ObjectTypeNode_deleteMembers((UA_ObjectTypeNode *)node);
         break;
-
     case UA_NODECLASS_VARIABLETYPE:
         UA_VariableTypeNode_deleteMembers((UA_VariableTypeNode *)node);
         break;
-
     case UA_NODECLASS_REFERENCETYPE:
         UA_ReferenceTypeNode_deleteMembers((UA_ReferenceTypeNode *)node);
         break;
-
     case UA_NODECLASS_DATATYPE:
         UA_DataTypeNode_deleteMembers((UA_DataTypeNode *)node);
         break;
-
     case UA_NODECLASS_VIEW:
         UA_ViewNode_deleteMembers((UA_ViewNode *)node);
         break;
-
     default:
         UA_assert(UA_FALSE);
         break;
@@ -64,8 +56,7 @@ static inline void node_deleteMembers(const UA_Node *node) {
 /* We are in a rcu_read lock. So the node will not be freed under our feet. */
 static int compare(struct cds_lfht_node *htn, const void *orig) {
     const UA_NodeId *origid = (const UA_NodeId *)orig;
-    const UA_NodeId *newid  = &((UA_NodeStore_Entry *)htn)->node.nodeId;   /* The htn is first in the entry structure. */
-
+    const UA_NodeId *newid  = &((struct nodeEntry *)htn)->node.nodeId;   /* The htn is first in the entry structure. */
     return UA_NodeId_equal(newid, origid);
 }
 
@@ -74,7 +65,7 @@ static int compare(struct cds_lfht_node *htn, const void *orig) {
    section) increased the refcount, we only need to wait for the refcount
    to reach zero. */
 static void markDead(struct rcu_head *head) {
-    UA_NodeStore_Entry *entry = caa_container_of(head, UA_NodeStore_Entry, rcu_head);
+    struct nodeEntry *entry = caa_container_of(head, struct nodeEntry, rcu_head);
     uatomic_and(&entry->refcount, ~ALIVE_BIT); // set the alive bit to zero
     if(uatomic_read(&entry->refcount) > 0)
         return;
@@ -86,7 +77,7 @@ static void markDead(struct rcu_head *head) {
 
 /* Free the entry if it is dead and nobody uses it anymore */
 void UA_NodeStore_release(const UA_Node *managed) {
-    UA_NodeStore_Entry *entry = caa_container_of(managed, UA_NodeStore_Entry, node); // pointer to the first entry
+    struct nodeEntry *entry = caa_container_of(managed, struct nodeEntry, node); // pointer to the first entry
     if(uatomic_sub_return(&entry->refcount, 1) > 0)
         return;
 
@@ -119,7 +110,7 @@ void UA_NodeStore_delete(UA_NodeStore *ns) {
     while(iter.node != UA_NULL) {
         found_htn = cds_lfht_iter_get_node(&iter);
         if(!cds_lfht_del(ht, found_htn)) {
-            UA_NodeStore_Entry *entry = caa_container_of(found_htn, UA_NodeStore_Entry, htn);
+            struct nodeEntry *entry = caa_container_of(found_htn, struct nodeEntry, htn);
             call_rcu(&entry->rcu_head, markDead);
         }
         cds_lfht_next(ht, &iter);
@@ -162,8 +153,8 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boo
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 
-    UA_NodeStore_Entry *entry;
-    if(!(entry = UA_alloc(sizeof(UA_NodeStore_Entry) - sizeof(UA_Node) + nodesize)))
+    struct nodeEntry *entry;
+    if(!(entry = UA_alloc(sizeof(struct nodeEntry) - sizeof(UA_Node) + nodesize)))
         return UA_STATUSCODE_BADOUTOFMEMORY;
     memcpy((void*)&entry->node, *node, nodesize);
 
@@ -216,7 +207,8 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, const UA_Node **node, UA_Boo
     return UA_STATUSCODE_GOOD;
 }
 
-UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Boolean getManaged) {
+UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node *oldNode,
+                                   const UA_Node **node, UA_Boolean getManaged) {
     UA_UInt32 nodesize;
     /* Copy the node into the entry. Then reset the original node. It shall no longer be used. */
     switch((*node)->nodeClass) {
@@ -248,8 +240,8 @@ UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Bo
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 
-    UA_NodeStore_Entry *entry;
-    if(!(entry = UA_alloc(sizeof(UA_NodeStore_Entry) - sizeof(UA_Node) + nodesize)))
+    struct nodeEntry *entry;
+    if(!(entry = UA_alloc(sizeof(struct nodeEntry) - sizeof(UA_Node) + nodesize)))
         return UA_STATUSCODE_BADOUTOFMEMORY;
     memcpy((void*)&entry->node, *node, nodesize);
 
@@ -265,26 +257,41 @@ UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node **node, UA_Bo
     cds_lfht_lookup(ns->ht, h, compare, &(*node)->nodeId, &iter);
     struct cds_lfht_node *result = cds_lfht_iter_get_node(&iter);
 
-    if(result && cds_lfht_replace(ns->ht, &iter, h, compare, &(*node)->nodeId, &entry->htn) == 0) {
-        UA_NodeStore_Entry *entry = caa_container_of(result, UA_NodeStore_Entry, htn);
-        /* If an entry got replaced, mark it as dead. */
-        call_rcu(&entry->rcu_head, markDead);
-        rcu_read_unlock();
-    } else {
+    /* No node found that can be replaced */
+    if(!result) {
         rcu_read_unlock();
         UA_free(entry);
         return UA_STATUSCODE_BADNODEIDUNKNOWN;
     }
 
+    struct nodeEntry *found_entry = (struct nodeEntry *)cds_lfht_iter_get_node(&iter);
+    /* The node we found is obsolete*/
+    if(&found_entry->node != oldNode) {
+        rcu_read_unlock();
+        UA_free(entry);
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+
     /* The old node is replaced by a managed node. */
-    UA_free((UA_Node *)*node);
-    if(getManaged)
-        *node = &entry->node;
-    else
-        *node = UA_NULL;
+    if(cds_lfht_replace(ns->ht, &iter, h, compare, &(*node)->nodeId, &entry->htn) == 0) {
+        struct nodeEntry *entry = caa_container_of(result, struct nodeEntry, htn);
+        /* If an entry got replaced, mark it as dead. */
+        call_rcu(&entry->rcu_head, markDead);
+        rcu_read_unlock();
 
-    return UA_STATUSCODE_GOOD;
+        UA_free((UA_Node *)*node);
+        if(getManaged)
+            *node = &entry->node;
+        else
+            *node = UA_NULL;
 
+        return UA_STATUSCODE_GOOD;
+    }
+    
+    /* Replacing failed. Maybe the node got replaced just before this thread tried to.*/
+    rcu_read_unlock();
+    UA_free(entry);
+    return UA_STATUSCODE_BADINTERNALERROR;
 }
 
 UA_StatusCode UA_NodeStore_remove(UA_NodeStore *ns, const UA_NodeId *nodeid) {
@@ -301,7 +308,7 @@ UA_StatusCode UA_NodeStore_remove(UA_NodeStore *ns, const UA_NodeId *nodeid) {
         return UA_STATUSCODE_BADNODEIDUNKNOWN;
     }
 
-    UA_NodeStore_Entry *entry = caa_container_of(found_htn, UA_NodeStore_Entry, htn);
+    struct nodeEntry *entry = caa_container_of(found_htn, struct nodeEntry, htn);
     call_rcu(&entry->rcu_head, markDead);
     rcu_read_unlock();
 
@@ -314,7 +321,7 @@ const UA_Node * UA_NodeStore_get(const UA_NodeStore *ns, const UA_NodeId *nodeid
 
     rcu_read_lock();
     cds_lfht_lookup(ns->ht, nhash, compare, nodeid, &iter);
-    UA_NodeStore_Entry *found_entry = (UA_NodeStore_Entry *)cds_lfht_iter_get_node(&iter);
+    struct nodeEntry *found_entry = (struct nodeEntry *)cds_lfht_iter_get_node(&iter);
 
     if(!found_entry) {
         rcu_read_unlock();
@@ -334,7 +341,7 @@ void UA_NodeStore_iterate(const UA_NodeStore *ns, UA_NodeStore_nodeVisitor visit
     rcu_read_lock();
     cds_lfht_first(ht, &iter);
     while(iter.node != UA_NULL) {
-        UA_NodeStore_Entry *found_entry = (UA_NodeStore_Entry *)cds_lfht_iter_get_node(&iter);
+        struct nodeEntry *found_entry = (struct nodeEntry *)cds_lfht_iter_get_node(&iter);
         uatomic_inc(&found_entry->refcount);
         const UA_Node      *node = &found_entry->node;
         rcu_read_unlock();

+ 10 - 7
src/server/ua_server_addressspace.c

@@ -43,8 +43,8 @@ void UA_Server_addScalarVariableNode(UA_Server *server, UA_QualifiedName *browse
 }
 
 /* Adds a one-way reference to the local nodestore */
-UA_StatusCode addOneWayReferenceWithSession (UA_Server *server, UA_Session *session,
-                                             const UA_AddReferencesItem *item) {
+UA_StatusCode addOneWayReferenceWithSession(UA_Server *server, UA_Session *session,
+                                            const UA_AddReferencesItem *item) {
     // use the servers nodestore
     const UA_Node *node = UA_NodeStore_get(server->nodestore, &item->sourceNodeId);
     // todo differentiate between error codes
@@ -84,12 +84,15 @@ UA_StatusCode addOneWayReferenceWithSession (UA_Server *server, UA_Session *sess
     UA_free(old_refs);
     newNode->references = new_refs;
     newNode->referencesSize = ++count;
-    retval = UA_NodeStore_replace(server->nodestore, (const UA_Node **)&newNode, UA_FALSE);
-    if(retval)
-        nodeVT->delete(newNode);
+    retval = UA_NodeStore_replace(server->nodestore, node, (const UA_Node **)&newNode, UA_FALSE);
     UA_NodeStore_release(node);
-
-    return retval;
+    if(retval != UA_STATUSCODE_BADINTERNALERROR)
+        return retval;
+    
+    // error presumably because the node was replaced and an old version was updated
+    // just try again
+    nodeVT->delete(newNode);
+    return addOneWayReferenceWithSession(server, session, item);
 }
 
 UA_StatusCode UA_Server_addReference(UA_Server *server, const UA_AddReferencesItem *item) {

+ 5 - 3
tests/check_nodestore.c

@@ -35,9 +35,10 @@ UA_StatusCode createNode(UA_Node** p, UA_Int16 nsid, UA_Int32 id) {
 START_TEST(replaceExistingNode) {
 	UA_NodeStore *ns = UA_NodeStore_new();
 	UA_Node* n1; createNode(&n1,0,2253);
-	UA_NodeStore_insert(ns, (const UA_Node **)&n1, UA_FALSE);
+	UA_NodeStore_insert(ns, (const UA_Node **)&n1, UA_TRUE);
 	UA_Node* n2; createNode(&n2,0,2253);
-    UA_StatusCode retval = UA_NodeStore_replace(ns, (const UA_Node **)&n2, UA_FALSE);
+    UA_StatusCode retval = UA_NodeStore_replace(ns, n1, (const UA_Node **)&n2, UA_FALSE);
+    UA_NodeStore_release(n1);
     
 	ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);
     
@@ -47,8 +48,9 @@ END_TEST
 
 START_TEST(replaceNonExistingNode) {
 	UA_NodeStore *ns = UA_NodeStore_new();
+	UA_Node* n1; createNode(&n1,0,2253);
 	UA_Node* n2; createNode(&n2,0,2253);
-    UA_StatusCode retval = UA_NodeStore_replace(ns, (const UA_Node **)&n2, UA_FALSE);
+    UA_StatusCode retval = UA_NodeStore_replace(ns, n1, (const UA_Node **)&n2, UA_FALSE);
     
 	ck_assert_int_ne(retval, UA_STATUSCODE_GOOD);