Browse Source

put every repeated job into a separate entry of the linked list

Julius Pfrommer 8 years ago
parent
commit
baa92aea25
2 changed files with 159 additions and 258 deletions
  1. 1 1
      src/server/ua_server_internal.h
  2. 158 257
      src/server/ua_server_worker.c

+ 1 - 1
src/server/ua_server_internal.h

@@ -60,7 +60,7 @@ struct UA_Server {
 #endif
 
     /* Jobs with a repetition interval */
-    LIST_HEAD(RepeatedJobsList, RepeatedJobs) repeatedJobs;
+    LIST_HEAD(RepeatedJobsList, RepeatedJob) repeatedJobs;
 
 #ifdef UA_ENABLE_MULTITHREADING
     /* Dispatch queue head for the worker threads (the tail should not be in the same cache line) */

+ 158 - 257
src/server/ua_server_worker.c

@@ -33,39 +33,36 @@
  */
 
 #define MAXTIMEOUT 50 // max timeout in millisec until the next main loop iteration
-#define BATCHSIZE 20 // max number of jobs that are dispatched at once to workers
 
-static void processJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
+static void
+processJob(UA_Server *server, UA_Job *job) {
     UA_ASSERT_RCU_UNLOCKED();
     UA_RCU_LOCK();
-    for(size_t i = 0; i < jobsSize; i++) {
-        UA_Job *job = &jobs[i];
-        switch(job->type) {
-        case UA_JOBTYPE_NOTHING:
-            break;
-        case UA_JOBTYPE_DETACHCONNECTION:
-            UA_Connection_detachSecureChannel(job->job.closeConnection);
-            break;
-        case UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER:
-            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
-                                           &job->job.binaryMessage.message);
-            UA_Connection *connection = job->job.binaryMessage.connection;
-            connection->releaseRecvBuffer(connection, &job->job.binaryMessage.message);
-            break;
-        case UA_JOBTYPE_BINARYMESSAGE_ALLOCATED:
-            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
-                                           &job->job.binaryMessage.message);
-            UA_ByteString_deleteMembers(&job->job.binaryMessage.message);
-            break;
-        case UA_JOBTYPE_METHODCALL:
-        case UA_JOBTYPE_METHODCALL_DELAYED:
-            job->job.methodCall.method(server, job->job.methodCall.data);
-            break;
-        default:
-            UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
-                           "Trying to execute a job of unknown type");
-            break;
-        }
+    switch(job->type) {
+    case UA_JOBTYPE_NOTHING:
+        break;
+    case UA_JOBTYPE_DETACHCONNECTION:
+        UA_Connection_detachSecureChannel(job->job.closeConnection);
+        break;
+    case UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER:
+        UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                       &job->job.binaryMessage.message);
+        UA_Connection *connection = job->job.binaryMessage.connection;
+        connection->releaseRecvBuffer(connection, &job->job.binaryMessage.message);
+        break;
+    case UA_JOBTYPE_BINARYMESSAGE_ALLOCATED:
+        UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                       &job->job.binaryMessage.message);
+        UA_ByteString_deleteMembers(&job->job.binaryMessage.message);
+        break;
+    case UA_JOBTYPE_METHODCALL:
+    case UA_JOBTYPE_METHODCALL_DELAYED:
+        job->job.methodCall.method(server, job->job.methodCall.data);
+        break;
+    default:
+        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "Trying to execute a job of unknown type");
+        break;
     }
     UA_RCU_UNLOCK();
 }
@@ -81,14 +78,13 @@ struct MainLoopJob {
     UA_Job job;
 };
 
-/** Entry in the dispatch queue */
-struct DispatchJobsList {
+struct DispatchJob {
     struct cds_wfcq_node node; // node for the queue
-    size_t jobsSize;
-    UA_Job *jobs;
+    UA_Job job;
 };
 
-static void * workerLoop(UA_Worker *worker) {
+static void *
+workerLoop(UA_Worker *worker) {
     UA_Server *server = worker->server;
     UA_UInt32 *counter = &worker->counter;
     volatile UA_Boolean *running = &worker->running;
@@ -98,21 +94,19 @@ static void * workerLoop(UA_Worker *worker) {
     rcu_register_thread();
 
     pthread_mutex_t mutex; // required for the condition variable
-    pthread_mutex_init(&mutex,0);
+    pthread_mutex_init(&mutex, 0);
     pthread_mutex_lock(&mutex);
 
     while(*running) {
-        struct DispatchJobsList *wln = (struct DispatchJobsList*)
+        struct DispatchJob *dj = (struct DispatchJob*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
-        if(!wln) {
-            uatomic_inc(counter);
-            /* sleep until a work arrives (and wakes up all worker threads) */
+        if(dj) {
+            processJob(server, &dj->job);
+            UA_free(dj);
+        } else {
+            /* nothing to do. sleep until a job is dispatched (and wakes up all worker threads) */
             pthread_cond_wait(&server->dispatchQueue_condition, &mutex);
-            continue;
         }
-        processJobs(server, wln->jobs, wln->jobsSize);
-        UA_free(wln->jobs);
-        UA_free(wln);
         uatomic_inc(counter);
     }
 
@@ -124,39 +118,21 @@ static void * workerLoop(UA_Worker *worker) {
     return NULL;
 }
 
-/** Dispatch jobs to workers. Slices the job array up if it contains more than
-    BATCHSIZE items. The jobs array is freed in the worker threads. */
-static void dispatchJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
-    size_t startIndex = jobsSize; // start at the end
-    while(jobsSize > 0) {
-        size_t size = BATCHSIZE;
-        if(size > jobsSize)
-            size = jobsSize;
-        startIndex = startIndex - size;
-        struct DispatchJobsList *wln = UA_malloc(sizeof(struct DispatchJobsList));
-        if(startIndex > 0) {
-            wln->jobs = UA_malloc(size * sizeof(UA_Job));
-            memcpy(wln->jobs, &jobs[startIndex], size * sizeof(UA_Job));
-            wln->jobsSize = size;
-        } else {
-            /* forward the original array */
-            wln->jobsSize = size;
-            wln->jobs = jobs;
-        }
-        cds_wfcq_node_init(&wln->node);
-        cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
-        jobsSize -= size;
-    }
+static void
+dispatchJob(UA_Server *server, const UA_Job *job) {
+    struct DispatchJob *dj = UA_malloc(sizeof(struct DispatchJob));
+    dj->job = *job;
+    cds_wfcq_node_init(&dj->node);
+    cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &dj->node);
 }
 
 static void
 emptyDispatchQueue(UA_Server *server) {
     while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
-        struct DispatchJobsList *wln = (struct DispatchJobsList*)
+        struct DispatchJob *dj = (struct DispatchJob*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
-        processJobs(server, wln->jobs, wln->jobsSize);
-        UA_free(wln->jobs);
-        UA_free(wln);
+        processJob(server, &dj->job);
+        UA_free(dj);
     }
 }
 
@@ -166,183 +142,110 @@ emptyDispatchQueue(UA_Server *server) {
 /* Repeated Jobs */
 /*****************/
 
-struct IdentifiedJob {
-    UA_Job job;
-    UA_Guid id;
-};
-
-/**
- * The RepeatedJobs structure contains an array of jobs that are either executed with the same
- * repetition interval. The linked list is sorted, so we can stop traversing when the first element
- * has nextTime > now.
- */
-struct RepeatedJobs {
-    LIST_ENTRY(RepeatedJobs) pointers; ///> Links to the next list of repeated jobs (with a different) interval
-    UA_DateTime nextTime; ///> The next time when the jobs are to be executed
-    UA_UInt32 interval; ///> Interval in 100ns resolution
-    size_t jobsSize; ///> Number of jobs contained
-    struct IdentifiedJob jobs[]; ///> The jobs. This is not a pointer, instead the struct is variable sized.
-};
-
-/* throwaway struct for the mainloop callback */
-struct AddRepeatedJob {
-    struct IdentifiedJob job;
-    UA_UInt32 interval;
+/* The linked list of jobs is sorted according to the next execution timestamp */
+struct RepeatedJob {
+    LIST_ENTRY(RepeatedJob) next;  /* Next element in the list */
+    UA_DateTime nextTime;          /* The next time when the jobs are to be executed */
+    UA_UInt32 interval;            /* Interval in 100ns resolution */
+    UA_Guid id;                    /* Id of the repeated job */
+    UA_Job job;                    /* The job description itself */
 };
 
 /* internal. call only from the main loop. */
-static UA_StatusCode addRepeatedJob(UA_Server *server, struct AddRepeatedJob * UA_RESTRICT arw) {
-    struct RepeatedJobs *matchingTw = NULL; // add the item here
-    struct RepeatedJobs *lastTw = NULL; // if there is no repeated job, add a new one this entry
-    struct RepeatedJobs *tempTw;
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
-
+static void
+addRepeatedJob(UA_Server *server, struct RepeatedJob * UA_RESTRICT rj)
+{
     /* search for matching entry */
-    UA_DateTime firstTime = UA_DateTime_nowMonotonic() + arw->interval;
-    tempTw = LIST_FIRST(&server->repeatedJobs);
-    while(tempTw) {
-        if(arw->interval == tempTw->interval) {
-            matchingTw = tempTw;
-            break;
-        }
-        if(tempTw->nextTime > firstTime)
+    struct RepeatedJob *lastRj = NULL; /* Add after this entry or at LIST_HEAD if NULL */
+    struct RepeatedJob *tempRj = LIST_FIRST(&server->repeatedJobs);
+    while(tempRj) {
+        if(tempRj->nextTime > rj->nextTime)
             break;
-        lastTw = tempTw;
-        tempTw = LIST_NEXT(lastTw, pointers);
-    }
-
-    if(matchingTw) {
-        /* append to matching entry */
-        matchingTw = UA_realloc(matchingTw, sizeof(struct RepeatedJobs) +
-                                (sizeof(struct IdentifiedJob) * (matchingTw->jobsSize + 1)));
-        if(!matchingTw) {
-            retval = UA_STATUSCODE_BADOUTOFMEMORY;
-            goto cleanup;
-        }
-        /* link the reallocated tw into the list */
-        LIST_REPLACE(matchingTw, matchingTw, pointers);
-    } else {
-        /* create a new entry */
-        matchingTw = UA_malloc(sizeof(struct RepeatedJobs) + sizeof(struct IdentifiedJob));
-        if(!matchingTw) {
-            retval = UA_STATUSCODE_BADOUTOFMEMORY;
-            goto cleanup;
-        }
-        matchingTw->jobsSize = 0;
-        matchingTw->nextTime = firstTime;
-        matchingTw->interval = arw->interval;
-        if(lastTw)
-            LIST_INSERT_AFTER(lastTw, matchingTw, pointers);
-        else
-            LIST_INSERT_HEAD(&server->repeatedJobs, matchingTw, pointers);
+        lastRj = tempRj;
+        tempRj = LIST_NEXT(lastRj, next);
     }
-    matchingTw->jobs[matchingTw->jobsSize] = arw->job;
-    matchingTw->jobsSize++;
 
- cleanup:
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_free(arw);
-#endif
-    return retval;
+    /* add the repeated job */
+    if(lastRj)
+        LIST_INSERT_AFTER(lastRj, rj, next);
+    else
+        LIST_INSERT_HEAD(&server->repeatedJobs, rj, next);
 }
 
-UA_StatusCode UA_Server_addRepeatedJob(UA_Server *server, UA_Job job, UA_UInt32 interval, UA_Guid *jobId) {
+UA_StatusCode
+UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
+                         UA_UInt32 interval, UA_Guid *jobId) {
     /* the interval needs to be at least 5ms */
     if(interval < 5)
         return UA_STATUSCODE_BADINTERNALERROR;
     interval *= (UA_UInt32)UA_MSEC_TO_DATETIME; // from ms to 100ns resolution
 
-#ifdef UA_ENABLE_MULTITHREADING
-    struct AddRepeatedJob *arw = UA_malloc(sizeof(struct AddRepeatedJob));
-    if(!arw)
+    /* Create and fill the repeated job structure */
+    struct RepeatedJob *rj = UA_malloc(sizeof(struct RepeatedJob));
+    if(!rj)
         return UA_STATUSCODE_BADOUTOFMEMORY;
+    rj->nextTime = UA_DateTime_nowMonotonic() + interval;
+    rj->interval = interval;
+    rj->id = UA_Guid_random();
+    rj->job = job;
 
-    arw->interval = interval;
-    arw->job.job = job;
-    if(jobId) {
-        arw->job.id = UA_Guid_random();
-        *jobId = arw->job.id;
-    } else
-        UA_Guid_init(&arw->job.id);
-
+#ifdef UA_ENABLE_MULTITHREADING
+    /* Call addRepeatedJob from the main loop */
     struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
     if(!mlw) {
-        UA_free(arw);
+        UA_free(rj);
         return UA_STATUSCODE_BADOUTOFMEMORY;
     }
     mlw->job = (UA_Job) {
         .type = UA_JOBTYPE_METHODCALL,
-        .job.methodCall = {.data = arw, .method = (void (*)(UA_Server*, void*))addRepeatedJob}};
+        .job.methodCall = {.data = rj, .method = (void (*)(UA_Server*, void*))addRepeatedJob}};
     cds_lfs_push(&server->mainLoopJobs, &mlw->node);
 #else
-    struct AddRepeatedJob arw;
-    arw.interval = interval;
-    arw.job.job = job;
-    if(jobId) {
-        arw.job.id = UA_Guid_random();
-        *jobId = arw.job.id;
-    } else
-        UA_Guid_init(&arw.job.id);
-    addRepeatedJob(server, &arw);
+    /* Add directly */
+    addRepeatedJob(server, rj);
 #endif
+    if(jobId)
+        *jobId = rj->id;
     return UA_STATUSCODE_GOOD;
 }
 
 /* Returns the next datetime when a repeated job is scheduled */
-static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
-    struct RepeatedJobs *tw, *tmp_tw;
+static UA_DateTime
+processRepeatedJobs(UA_Server *server, UA_DateTime current) {
+    struct RepeatedJob *rj, *tmp_rj;
     /* Iterate over the list of elements (sorted according to the next execution timestamp) */
-    LIST_FOREACH_SAFE(tw, &server->repeatedJobs, pointers, tmp_tw) {
-        if(tw->nextTime > current)
+    LIST_FOREACH_SAFE(rj, &server->repeatedJobs, next, tmp_rj) {
+        if(rj->nextTime > current)
             break;
 
+        /* Dispatch/process job */
 #ifdef UA_ENABLE_MULTITHREADING
-        // copy the entry and insert at the new location
-        UA_Job *jobsCopy = UA_malloc(sizeof(UA_Job) * tw->jobsSize);
-        if(!jobsCopy) {
-            UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
-                         "Not enough memory to dispatch delayed jobs");
-            break;
-        }
-        for(size_t i=0;i<tw->jobsSize;i++)
-            jobsCopy[i] = tw->jobs[i].job;
-        dispatchJobs(server, jobsCopy, tw->jobsSize); // frees the job pointer
+        dispatchJob(server, &rj->job);
 #else
-        size_t size = tw->jobsSize;
-        for(size_t i = 0; i < size; i++)
-            processJobs(server, &tw->jobs[i].job, 1); // does not free the job ptr
+        processJob(server, &rj->job);
 #endif
 
-        /* Elements are removed only here. Check if empty. */
-        if(tw->jobsSize == 0) {
-            LIST_REMOVE(tw, pointers);
-            UA_free(tw);
-            UA_assert(LIST_FIRST(&server->repeatedJobs) != tw); /* Assert for static code checkers */
-            continue;
-        }
-
         /* Set the time for the next execution */
-        tw->nextTime += tw->interval;
-        if(tw->nextTime < current)
-            tw->nextTime = current;
+        rj->nextTime += rj->interval;
+        if(rj->nextTime < current)
+            rj->nextTime = current;
 
-        /* Reinsert to keep the list sorted */
-        struct RepeatedJobs *prevTw = LIST_FIRST(&server->repeatedJobs);
+        /* Keep the list sorted */
+        struct RepeatedJob *prev_rj = LIST_FIRST(&server->repeatedJobs);
         while(true) {
-            struct RepeatedJobs *n = LIST_NEXT(prevTw, pointers);
-            if(!n || n->nextTime > tw->nextTime)
+            struct RepeatedJob *n = LIST_NEXT(prev_rj, next);
+            if(!n || n->nextTime > rj->nextTime)
                 break;
-            prevTw = n;
+            prev_rj = n;
         }
-        if(prevTw != tw) {
-            LIST_REMOVE(tw, pointers);
-            LIST_INSERT_AFTER(prevTw, tw, pointers);
+        if(prev_rj != rj) {
+            LIST_REMOVE(rj, next);
+            LIST_INSERT_AFTER(prev_rj, rj, next);
         }
     }
 
-    // check if the next repeated job is sooner than the usual timeout
-    // calc in 32 bit must be ok
-    struct RepeatedJobs *first = LIST_FIRST(&server->repeatedJobs);
+    /* Check if the next repeated job is sooner than the usual timeout */
+    struct RepeatedJob *first = LIST_FIRST(&server->repeatedJobs);
     UA_DateTime next = current + (MAXTIMEOUT * UA_MSEC_TO_DATETIME);
     if(first && first->nextTime < next)
         next = first->nextTime;
@@ -350,23 +253,19 @@ static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
 }
 
 /* Call this function only from the main loop! */
-static void removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
-    struct RepeatedJobs *tw;
-    LIST_FOREACH(tw, &server->repeatedJobs, pointers) {
-        for(size_t i = 0; i < tw->jobsSize; i++) {
-            if(!UA_Guid_equal(jobId, &tw->jobs[i].id))
-                continue;
-            tw->jobsSize--; /* if size == 0, tw is freed during the next processing */
-            if(tw->jobsSize > 0)
-                tw->jobs[i] = tw->jobs[tw->jobsSize]; // move the last entry to overwrite
-            goto finish;
-        }
+static void
+removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
+    struct RepeatedJob *rj;
+    LIST_FOREACH(rj, &server->repeatedJobs, next) {
+        if(!UA_Guid_equal(jobId, &rj->id))
+            continue;
+        LIST_REMOVE(rj, next);
+        UA_free(rj);
+        break;
     }
- finish:
 #ifdef UA_ENABLE_MULTITHREADING
     UA_free(jobId);
 #endif
-    return;
 }
 
 UA_StatusCode UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
@@ -388,9 +287,9 @@ UA_StatusCode UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
 }
 
 void UA_Server_deleteAllRepeatedJobs(UA_Server *server) {
-    struct RepeatedJobs *current, *temp;
-    LIST_FOREACH_SAFE(current, &server->repeatedJobs, pointers, temp) {
-        LIST_REMOVE(current, pointers);
+    struct RepeatedJob *current, *temp;
+    LIST_FOREACH_SAFE(current, &server->repeatedJobs, next, temp) {
+        LIST_REMOVE(current, next);
         UA_free(current);
     }
 }
@@ -418,9 +317,9 @@ static void getCounters(UA_Server *server, struct DelayedJobs *delayed) {
     delayed->workerCounters = counters;
 }
 
-// Call from the main thread only. This is the only function that modifies
-// server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
-// head).
+/* Call from the main thread only. This is the only function that modifies */
+/* server->delayedWork. processDelayedWorkQueue modifies the "next" (after the */
+/* head). */
 static void addDelayedJob(UA_Server *server, UA_Job *job) {
     struct DelayedJobs *dj = server->delayedJobs;
     if(!dj || dj->jobsCount >= DELAYEDJOBSSIZE) {
@@ -438,37 +337,29 @@ static void addDelayedJob(UA_Server *server, UA_Job *job) {
 
         /* dispatch a method that sets the counter for the full list that comes afterwards */
         if(dj->next) {
-            UA_Job *setCounter = UA_malloc(sizeof(UA_Job));
-            *setCounter = (UA_Job) {.type = UA_JOBTYPE_METHODCALL, .job.methodCall =
-                                    {.method = (void (*)(UA_Server*, void*))getCounters, .data = dj->next}};
-            dispatchJobs(server, setCounter, 1);
+            UA_Job setCounter = (UA_Job){
+                .type = UA_JOBTYPE_METHODCALL, .job.methodCall =
+                {.method = (void (*)(UA_Server*, void*))getCounters, .data = dj->next}};
+            dispatchJob(server, &setCounter);
         }
     }
     dj->jobs[dj->jobsCount] = *job;
     dj->jobsCount++;
 }
 
-static void addDelayedJobAsync(UA_Server *server, UA_Job *job) {
-    addDelayedJob(server, job);
-    UA_free(job);
-}
-
-static void server_free(UA_Server *server, void *data) {
+static void
+delayed_free(UA_Server *server, void *data) {
     UA_free(data);
 }
 
 UA_StatusCode UA_Server_delayedFree(UA_Server *server, void *data) {
-    UA_Job *j = UA_malloc(sizeof(UA_Job));
-    if(!j)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    j->type = UA_JOBTYPE_METHODCALL;
-    j->job.methodCall.data = data;
-    j->job.methodCall.method = server_free;
-    struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
-    mlw->job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL, .job.methodCall =
-                         {.data = j, .method = (UA_ServerCallback)addDelayedJobAsync}};
-    cds_lfs_push(&server->mainLoopJobs, &mlw->node);
-    return UA_STATUSCODE_GOOD;
+    return UA_Server_delayedCallback(server, delayed_free, data);
+}
+
+static void
+addDelayedJobAsync(UA_Server *server, UA_Job *job) {
+    addDelayedJob(server, job);
+    UA_free(job);
 }
 
 UA_StatusCode
@@ -522,10 +413,11 @@ dispatchDelayedJobs(UA_Server *server, void *_) {
 #endif
     /* process and free all delayed jobs from here on */
     while(dw) {
-        processJobs(server, dw->jobs, dw->jobsCount);
+        for(size_t i = 0; i < dw->jobsCount; i++)
+            processJob(server, &dw->jobs[i]);
         struct DelayedJobs *next = uatomic_xchg(&beforedw->next, NULL);
-        UA_free(dw);
         UA_free(dw->workerCounters);
+        UA_free(dw);
         dw = next;
     }
 #if (__GNUC__ <= 4 && __GNUC_MINOR__ <= 6)
@@ -549,12 +441,11 @@ static void processMainLoopJobs(UA_Server *server) {
     struct MainLoopJob *mlw = (struct MainLoopJob*)&head->node;
     struct MainLoopJob *next;
     do {
-        processJobs(server, &mlw->job, 1);
+        processJob(server, &mlw->job);
         next = (struct MainLoopJob*)mlw->node.next;
         UA_free(mlw);
         //cppcheck-suppress unreadVariable
     } while((mlw = next));
-    //UA_free(head);
 }
 #endif
 
@@ -591,7 +482,10 @@ UA_StatusCode UA_Server_run_startup(UA_Server *server) {
     return result;
 }
 
-static void completeMessages(UA_Server *server, UA_Job *job) {
+/* completeMessages is run synchronous on the jobs returned from the network
+   layer, so that the order for processing TCP packets is never mixed up. */
+static void
+completeMessages(UA_Server *server, UA_Job *job) {
     UA_Boolean realloced = UA_FALSE;
     UA_StatusCode retval = UA_Connection_completeMessages(job->job.binaryMessage.connection,
                                                           &job->job.binaryMessage.message, &realloced);
@@ -653,16 +547,22 @@ UA_UInt16 UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
                 completeMessages(server, &jobs[k]);
         }
 
+        /* Dispatch/process jobs */
+        for(size_t j = 0; j < jobsSize; j++) {
 #ifdef UA_ENABLE_MULTITHREADING
-        dispatchJobs(server, jobs, jobsSize);
-        /* Wake up worker threads */
-        if(jobsSize > 0)
-            pthread_cond_broadcast(&server->dispatchQueue_condition);
+            dispatchJob(server, &jobs[j]);
 #else
-        processJobs(server, jobs, jobsSize);
-        if(jobsSize > 0)
-            UA_free(jobs);
+            processJob(server, &jobs[j]);
 #endif
+        }
+
+        if(jobsSize > 0) {
+#ifdef UA_ENABLE_MULTITHREADING
+            /* Wake up worker threads */
+            pthread_cond_broadcast(&server->dispatchQueue_condition);
+#endif
+            UA_free(jobs);
+        }
     }
 
     now = UA_DateTime_nowMonotonic();
@@ -677,7 +577,8 @@ UA_StatusCode UA_Server_run_shutdown(UA_Server *server) {
         UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
         UA_Job *stopJobs;
         size_t stopJobsSize = nl->stop(nl, &stopJobs);
-        processJobs(server, stopJobs, stopJobsSize);
+        for(size_t j = 0; j < stopJobsSize; j++)
+            processJob(server, &stopJobs[j]);
         UA_free(stopJobs);
     }