Explorar el Código

finalize improved handling of timed work

Julius Pfrommer hace 10 años
padre
commit
60d636db85

+ 1 - 1
examples/networklayer_tcp.c

@@ -329,7 +329,7 @@ static UA_Int32 ServerNetworkLayerTCP_getWork(ServerNetworkLayerTCP *layer, UA_W
 #endif
     removeMappings(layer, deletes);
     setFDSet(layer);
-    struct timeval tmptv = {0, timeout * 1000};
+    struct timeval tmptv = {0, timeout};
     UA_Int32 resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
     UA_WorkItem *items;
     if(resultsize < 0 || !(items = malloc(sizeof(UA_WorkItem)*(resultsize+1)))) {

+ 1 - 1
include/ua_server.h

@@ -197,7 +197,7 @@ typedef struct {
      *
      * @param workItems When the returned integer is positive, *workItems points
      * to an array of WorkItems of the returned size.
-     * @param timeout The timeout during which an event must arrive.
+     * @param timeout The timeout during which an event must arrive in microseconds
      * @return The size of the returned workItems array. If the result is
      * negative, an error has occured.
      */

+ 4 - 1
src/server/ua_server.c

@@ -713,11 +713,11 @@ UA_Server * UA_Server_new(UA_ServerConfig config) {
         UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_FOLDERTYPE));
     ADDREFERENCE(UA_NODEID_NUMERIC(0, UA_NS0ID_VIEWSFOLDER), UA_NODEID_NUMERIC(0, UA_NS0ID_HASTYPEDEFINITION),
         UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_FOLDERTYPE));
+    addObjectTypeNode(server, "ServerType", UA_NS0ID_SERVERTYPE, UA_NS0ID_BASEOBJECTTYPE, UA_NS0ID_HASSUBTYPE);
     addObjectTypeNode(server, "ServerDiagnosticsType", UA_NS0ID_SERVERDIAGNOSTICSTYPE, UA_NS0ID_BASEOBJECTTYPE, UA_NS0ID_HASSUBTYPE);
     addObjectTypeNode(server, "ServerCapatilitiesType", UA_NS0ID_SERVERCAPABILITIESTYPE, UA_NS0ID_BASEOBJECTTYPE, UA_NS0ID_HASSUBTYPE);
     addObjectTypeNode(server, "ServerStatusType", UA_NS0ID_SERVERSTATUSTYPE, UA_NS0ID_BASEOBJECTTYPE, UA_NS0ID_HASSUBTYPE);
 
-
     /**************/
     /* Data Types */
     /**************/
@@ -784,6 +784,9 @@ UA_Server * UA_Server_new(UA_ServerConfig config) {
    UA_Server_addNode(server, (UA_Node*)servernode,
            &UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
            &UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES));
+   ADDREFERENCE(UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER),
+       UA_NODEID_NUMERIC(0, UA_NS0ID_HASTYPEDEFINITION),
+       UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_SERVERTYPE));
 
    UA_VariableNode *namespaceArray = UA_VariableNode_new();
    copyNames((UA_Node*)namespaceArray, "NamespaceArray");

+ 33 - 26
src/server/ua_server_worker.c

@@ -12,7 +12,7 @@
  *    all previous work has actually finished (only for multithreading)
  */
 
-#define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
+#define MAXTIMEOUT 5000 // max timeout in microsec until the next main loop iteration
 #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
 
 static void processWork(UA_Server *server, UA_WorkItem *work, size_t workSize) {
@@ -140,19 +140,16 @@ struct TimedWork {
     UA_DateTime nextTime;
     UA_UInt32 interval; ///> in ms resolution, 0 means no repetition
     size_t workSize;
-    struct {
-        UA_WorkItem work;
-        UA_Guid workId;
-    } work[];
+    UA_WorkItem *work;
+    UA_Guid workIds[];
 };
 
-/* The item is copied and not freed by this function. */
+/* The item is copied and not freed by this function. The interval is in 100ns (as UA_DateTime) */
 static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
-                                  UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
+                                  UA_UInt32 interval, UA_Guid *resultWorkGuid) {
     struct TimedWork *lastTw = UA_NULL, *matchingTw = UA_NULL;
-
     /* search for matching entry */
-    if(repetitionInterval == 0) {
+    if(interval == 0) {
         LIST_FOREACH(lastTw, &server->timedWork, pointers) {
             if(lastTw->nextTime == firstTime) {
                 if(lastTw->nextTime == firstTime)
@@ -162,7 +159,7 @@ static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA
         }
     } else {
         LIST_FOREACH(matchingTw, &server->timedWork, pointers) {
-            if(repetitionInterval == matchingTw->interval)
+            if(interval == matchingTw->interval)
                 break;
         }
     }
@@ -170,31 +167,40 @@ static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA
     struct TimedWork *newWork;
     if(matchingTw) {
         /* append to matching entry */
-        newWork = UA_realloc(matchingTw, sizeof(struct TimedWork) + (sizeof(UA_WorkItem)*matchingTw->workSize + 1));
+        newWork = UA_realloc(matchingTw, sizeof(struct TimedWork) + sizeof(UA_Guid)*(matchingTw->workSize + 1));
         if(!newWork)
             return UA_STATUSCODE_BADOUTOFMEMORY;
         if(newWork->pointers.le_next)
             newWork->pointers.le_next->pointers.le_prev = &newWork->pointers.le_next;
         if(newWork->pointers.le_prev)
             *newWork->pointers.le_prev = newWork;
+        UA_WorkItem *newItems = UA_realloc(newWork->work, sizeof(UA_WorkItem)*(matchingTw->workSize + 1));
+        if(!newItems)
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        newWork->work = newItems;
     } else {
         /* create a new entry */
-        newWork = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_WorkItem));
+        newWork = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_Guid));
         if(!newWork)
             return UA_STATUSCODE_BADOUTOFMEMORY;
+        newWork->work = UA_malloc(sizeof(UA_WorkItem));
+        if(!newWork->work) {
+            UA_free(newWork);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
         newWork->workSize = 0;
         newWork->nextTime = firstTime;
-        newWork->interval = repetitionInterval;
+        newWork->interval = interval;
         if(lastTw)
             LIST_INSERT_AFTER(lastTw, newWork, pointers);
         else
             LIST_INSERT_HEAD(&server->timedWork, newWork, pointers);
     }
-    newWork->work[newWork->workSize].work = *item;
     if(resultWorkGuid) {
-        newWork->work[newWork->workSize].workId = UA_Guid_random(&server->random_seed);
-        *resultWorkGuid = newWork->work[matchingTw->workSize - 1].workId;
+        newWork->workIds[newWork->workSize] = UA_Guid_random(&server->random_seed);
+        *resultWorkGuid = newWork->workIds[matchingTw->workSize];
     }
+    newWork->work[newWork->workSize] = *item;
     newWork->workSize++;
     return UA_STATUSCODE_GOOD;
 }
@@ -207,7 +213,7 @@ UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *w
 
 UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
                                             UA_Guid *resultWorkGuid) {
-    return addTimedWork(server, work, UA_DateTime_now() + interval * 1000, interval * 1000, resultWorkGuid);
+    return addTimedWork(server, work, UA_DateTime_now() + interval * 10000, interval * 10000, resultWorkGuid);
 }
 
 /** Dispatches timed work, returns the timeout until the next timed work in ms */
@@ -223,13 +229,12 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
         next = LIST_NEXT(tw, pointers);
 
 #ifdef UA_MULTITHREADING
-        if(tw->repetitionInterval > 0) {
+        if(tw->interval > 0) {
             // copy the entry and insert at the new location
             UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
             UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
             dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
-            tw->time += tw->repetitionInterval;
-
+            tw->time += tw->interval;
             struct TimedWork *prevTw = tw; // after which tw do we insert?
             while(UA_TRUE) {
                 struct TimedWork *n = LIST_NEXT(prevTw, pointers);
@@ -244,17 +249,17 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
         } else {
             dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
             LIST_REMOVE(tw, pointers);
-            UA_free(tw->workIds);
             UA_free(tw);
         }
 #else
         // 1) Process the work since it is past its due date
-        for(size_t i = 0; i < tw->workSize; i++)
-            processWork(server, &tw->work[i].work, 1);
+        processWork(server, tw->work, tw->workSize); // does not free the work ptr
 
         // 2) If the work is repeated, add it back into the list. Otherwise remove it.
         if(tw->interval > 0) {
-            tw->nextTime += tw->interval * 10;
+            tw->nextTime += tw->interval;
+            if(tw->nextTime < current)
+                tw->nextTime = current;
             struct TimedWork *prevTw = tw;
             while(UA_TRUE) {
                 struct TimedWork *n = LIST_NEXT(prevTw, pointers);
@@ -268,6 +273,7 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
             }
         } else {
             LIST_REMOVE(tw, pointers);
+            UA_free(tw->work);
             UA_free(tw);
         }
 #endif
@@ -277,7 +283,7 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
     struct TimedWork *first = LIST_FIRST(&server->timedWork);
     UA_UInt16 timeout = MAXTIMEOUT;
     if(first) {
-        timeout = (first->nextTime - current)/10000;
+        timeout = (first->nextTime - current)/10;
         if(timeout > MAXTIMEOUT)
             return MAXTIMEOUT;
     }
@@ -291,6 +297,7 @@ void UA_Server_deleteTimedWork(UA_Server *server) {
         current = next;
         next = LIST_NEXT(current, pointers);
         LIST_REMOVE(current, pointers);
+        UA_free(current->workIds);
         UA_free(current);
     }
 }
@@ -433,7 +440,7 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
     for(size_t i = 0; i <server->networkLayersSize; i++)
         server->networkLayers[i].start(server->networkLayers[i].nlHandle, &server->logger);
 
-    //3) The loop
+    // 3) The loop
     while(1) {
         // 3.1) Process timed work
         UA_UInt16 timeout = processTimedWork(server);

+ 0 - 2
src/server/ua_services_session.c

@@ -133,8 +133,6 @@ void Service_ActivateSession(UA_Server *server,UA_SecureChannel *channel,
     //todo cleanup session
     RETURN;
 
-
-
 }
 #undef RETURN