Browse Source

refactor(plugin): Replace CAS in nodestore with read/write barriers

The current use-case for concurrent access to the nodestore only covers
interrupts where the Publisher wants to read. For full read/write
concurrency the RCU nodestore has to be recovered.

This commit simplifies the nodestore by removing the CAS operations.
Read/write barriers are used to ensure that an interrupt cannot find
 dangling pointers to entries that are currently freed.
Julius Pfrommer 5 years ago
parent
commit
47df955bb9
1 changed files with 23 additions and 46 deletions
  1. 23 46
      plugins/ua_nodestore_hashmap.c

+ 23 - 46
plugins/ua_nodestore_hashmap.c

@@ -17,11 +17,7 @@
  *
  * - Tombstone or non-matching NodeId: continue searching
  * - Matching NodeId: Return the entry
- * - NULL: Abort the search
- *
- * The nodestore uses atomic operations to set entries of the hash-map. So the
- * nodestore allows read-access from an interrupt without seeing corrupted
- * nodes. */
+ * - NULL: Abort the search */
 
 typedef struct UA_NodeMapEntry {
     struct UA_NodeMapEntry *orig; /* the version this is a copy from (or NULL) */
@@ -196,20 +192,6 @@ cleanupNodeMapEntry(UA_NodeMapEntry *entry) {
         deleteNodeMapEntry(entry);
 }
 
-static UA_StatusCode
-clearSlot(UA_NodeMap *ns, UA_NodeMapSlot *slot) {
-    UA_NodeMapEntry *entry = slot->entry;
-    if(UA_atomic_cmpxchg((void**)&slot->entry, entry, UA_NODEMAP_TOMBSTONE) != entry)
-        return UA_STATUSCODE_BADINTERNALERROR;
-    entry->deleted = true;
-    cleanupNodeMapEntry(entry);
-    --ns->count;
-    /* Downsize the hashmap if it is very empty */
-    if(ns->count * 8 < ns->size && ns->size > 32)
-        expand(ns); /* Can fail. Just continue with the bigger hashmap. */
-    return UA_STATUSCODE_GOOD;
-}
-
 static UA_NodeMapSlot *
 findOccupiedSlot(const UA_NodeMap *ns, const UA_NodeId *nodeid) {
     UA_UInt32 h = UA_NodeId_hash(nodeid);
@@ -220,16 +202,13 @@ findOccupiedSlot(const UA_NodeMap *ns, const UA_NodeId *nodeid) {
 
     do {
         UA_NodeMapSlot *slot= &ns->slots[(UA_UInt32)idx];
-
-        /* Found a candidate */
         if(slot->entry > UA_NODEMAP_TOMBSTONE) {
             if(slot->nodeIdHash == h &&
                UA_NodeId_equal(&slot->entry->node.nodeId, nodeid))
                 return slot;
         } else {
-            /* No entry can be found afterwards */
             if(slot->entry == NULL)
-                return NULL;
+                return NULL; /* No further entry possible */
         }
 
         idx += hash2;
@@ -305,12 +284,20 @@ static UA_StatusCode
 UA_NodeMap_removeNode(void *context, const UA_NodeId *nodeid) {
     UA_NodeMap *ns = (UA_NodeMap*)context;
     UA_NodeMapSlot *slot = findOccupiedSlot(ns, nodeid);
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
-    if(slot)
-        retval = clearSlot(ns, slot);
-    else
-        retval = UA_STATUSCODE_BADNODEIDUNKNOWN;
-    return retval;
+    if(!slot)
+        return UA_STATUSCODE_BADNODEIDUNKNOWN;
+
+    UA_NodeMapEntry *entry = slot->entry;
+    slot->entry = UA_NODEMAP_TOMBSTONE;
+    UA_atomic_sync(); /* Set the tombstone before cleaning up. E.g. if the
+                       * nodestore is accessed from an interrupt. */
+    entry->deleted = true;
+    cleanupNodeMapEntry(entry);
+    --ns->count;
+    /* Downsize the hashmap if it is very empty */
+    if(ns->count * 8 < ns->size && ns->size > UA_NODEMAP_MINSIZE)
+        expand(ns); /* Can fail. Just continue with the bigger hashmap. */
+    return UA_STATUSCODE_GOOD;
 }
 
 static UA_StatusCode
@@ -367,17 +354,11 @@ UA_NodeMap_insertNode(void *context, UA_Node *node,
         }
     }
 
-    /* Store the hash */
+    /* Insert the node */
     UA_NodeMapEntry *newEntry = container_of(node, UA_NodeMapEntry, node);
     slot->nodeIdHash = UA_NodeId_hash(&node->nodeId);
-
-    /* Insert the node */
-    UA_NodeMapEntry *oldEntry = slot->entry;
-    if(UA_atomic_cmpxchg((void**)&slot->entry, oldEntry, newEntry) != oldEntry) {
-        deleteNodeMapEntry(container_of(node, UA_NodeMapEntry, node));
-        return UA_STATUSCODE_BADNODEIDEXISTS;
-    }
-
+    UA_atomic_sync(); /* Set the hash first */
+    slot->entry = newEntry;
     ++ns->count;
     return retval;
 }
@@ -393,20 +374,17 @@ UA_NodeMap_replaceNode(void *context, UA_Node *node) {
         deleteNodeMapEntry(newEntry);
         return UA_STATUSCODE_BADNODEIDUNKNOWN;
     }
-    UA_NodeMapEntry *oldEntry = slot->entry;
 
     /* The node was already updated since the copy was made? */
+    UA_NodeMapEntry *oldEntry = slot->entry;
     if(oldEntry != newEntry->orig) {
         deleteNodeMapEntry(newEntry);
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 
-    /* Replace the entry with an atomic operation */
-    if(UA_atomic_cmpxchg((void**)&slot->entry, oldEntry, newEntry) != oldEntry) {
-        deleteNodeMapEntry(newEntry);
-        return UA_STATUSCODE_BADINTERNALERROR;
-    }
-
+    /* Replace the entry */
+    slot->entry = newEntry;
+    UA_atomic_sync();
     oldEntry->deleted = true;
     cleanupNodeMapEntry(oldEntry);
     return UA_STATUSCODE_GOOD;
@@ -473,6 +451,5 @@ UA_Nodestore_HashMap(UA_Nodestore *ns) {
     ns->replaceNode = UA_NodeMap_replaceNode;
     ns->removeNode = UA_NodeMap_removeNode;
     ns->iterate = UA_NodeMap_iterate;
-
     return UA_STATUSCODE_GOOD;
 }