Преглед на файлове

Merge pull request #771 from open62541/subscriptions_test

Subscriptions test
Julius Pfrommer преди 8 години
родител
ревизия
5a6f046259

+ 3 - 6
include/ua_log.h

@@ -107,12 +107,9 @@ typedef void (*UA_Logger)(UA_LogLevel level, UA_LogCategory category,
  * Convenience macros for complex types
  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 #define UA_PRINTF_GUID_FORMAT "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}"
-#define UA_PRINTF_GUID_DATA(GUID) (GUID).identifier.guid.data1,     \
-        (GUID).identifier.guid.data2, (GUID).identifier.guid.data3, \
-        (GUID).identifier.guid.data4[0], (GUID).identifier.guid.data4[1], \
-        (GUID).identifier.guid.data4[2], (GUID).identifier.guid.data4[3], \
-        (GUID).identifier.guid.data4[4], (GUID).identifier.guid.data4[5], \
-        (GUID).identifier.guid.data4[6], (GUID).identifier.guid.data4[7]
+#define UA_PRINTF_GUID_DATA(GUID) (GUID).data1, (GUID).data2, (GUID).data3, \
+        (GUID).data4[0], (GUID).data4[1], (GUID).data4[2], (GUID).data4[3], \
+        (GUID).data4[4], (GUID).data4[5], (GUID).data4[6], (GUID).data4[7]
 
 #define UA_PRINTF_STRING_FORMAT "\"%.*s\""
 #define UA_PRINTF_STRING_DATA(STRING) (STRING).length, (STRING).data

+ 1 - 1
src/server/ua_server_internal.h

@@ -60,7 +60,7 @@ struct UA_Server {
 #endif
 
     /* Jobs with a repetition interval */
-    LIST_HEAD(RepeatedJobsList, RepeatedJobs) repeatedJobs;
+    LIST_HEAD(RepeatedJobsList, RepeatedJob) repeatedJobs;
 
 #ifdef UA_ENABLE_MULTITHREADING
     /* Dispatch queue head for the worker threads (the tail should not be in the same cache line) */

+ 158 - 257
src/server/ua_server_worker.c

@@ -33,39 +33,36 @@
  */
 
 #define MAXTIMEOUT 50 // max timeout in millisec until the next main loop iteration
-#define BATCHSIZE 20 // max number of jobs that are dispatched at once to workers
 
-static void processJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
+static void
+processJob(UA_Server *server, UA_Job *job) {
     UA_ASSERT_RCU_UNLOCKED();
     UA_RCU_LOCK();
-    for(size_t i = 0; i < jobsSize; i++) {
-        UA_Job *job = &jobs[i];
-        switch(job->type) {
-        case UA_JOBTYPE_NOTHING:
-            break;
-        case UA_JOBTYPE_DETACHCONNECTION:
-            UA_Connection_detachSecureChannel(job->job.closeConnection);
-            break;
-        case UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER:
-            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
-                                           &job->job.binaryMessage.message);
-            UA_Connection *connection = job->job.binaryMessage.connection;
-            connection->releaseRecvBuffer(connection, &job->job.binaryMessage.message);
-            break;
-        case UA_JOBTYPE_BINARYMESSAGE_ALLOCATED:
-            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
-                                           &job->job.binaryMessage.message);
-            UA_ByteString_deleteMembers(&job->job.binaryMessage.message);
-            break;
-        case UA_JOBTYPE_METHODCALL:
-        case UA_JOBTYPE_METHODCALL_DELAYED:
-            job->job.methodCall.method(server, job->job.methodCall.data);
-            break;
-        default:
-            UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
-                           "Trying to execute a job of unknown type");
-            break;
-        }
+    switch(job->type) {
+    case UA_JOBTYPE_NOTHING:
+        break;
+    case UA_JOBTYPE_DETACHCONNECTION:
+        UA_Connection_detachSecureChannel(job->job.closeConnection);
+        break;
+    case UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER:
+        UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                       &job->job.binaryMessage.message);
+        UA_Connection *connection = job->job.binaryMessage.connection;
+        connection->releaseRecvBuffer(connection, &job->job.binaryMessage.message);
+        break;
+    case UA_JOBTYPE_BINARYMESSAGE_ALLOCATED:
+        UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                       &job->job.binaryMessage.message);
+        UA_ByteString_deleteMembers(&job->job.binaryMessage.message);
+        break;
+    case UA_JOBTYPE_METHODCALL:
+    case UA_JOBTYPE_METHODCALL_DELAYED:
+        job->job.methodCall.method(server, job->job.methodCall.data);
+        break;
+    default:
+        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "Trying to execute a job of unknown type");
+        break;
     }
     UA_RCU_UNLOCK();
 }
@@ -81,14 +78,13 @@ struct MainLoopJob {
     UA_Job job;
 };
 
-/** Entry in the dispatch queue */
-struct DispatchJobsList {
+struct DispatchJob {
     struct cds_wfcq_node node; // node for the queue
-    size_t jobsSize;
-    UA_Job *jobs;
+    UA_Job job;
 };
 
-static void * workerLoop(UA_Worker *worker) {
+static void *
+workerLoop(UA_Worker *worker) {
     UA_Server *server = worker->server;
     UA_UInt32 *counter = &worker->counter;
     volatile UA_Boolean *running = &worker->running;
@@ -98,21 +94,19 @@ static void * workerLoop(UA_Worker *worker) {
     rcu_register_thread();
 
     pthread_mutex_t mutex; // required for the condition variable
-    pthread_mutex_init(&mutex,0);
+    pthread_mutex_init(&mutex, 0);
     pthread_mutex_lock(&mutex);
 
     while(*running) {
-        struct DispatchJobsList *wln = (struct DispatchJobsList*)
+        struct DispatchJob *dj = (struct DispatchJob*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
-        if(!wln) {
-            uatomic_inc(counter);
-            /* sleep until a work arrives (and wakes up all worker threads) */
+        if(dj) {
+            processJob(server, &dj->job);
+            UA_free(dj);
+        } else {
+            /* nothing to do. sleep until a job is dispatched (and wakes up all worker threads) */
             pthread_cond_wait(&server->dispatchQueue_condition, &mutex);
-            continue;
         }
-        processJobs(server, wln->jobs, wln->jobsSize);
-        UA_free(wln->jobs);
-        UA_free(wln);
         uatomic_inc(counter);
     }
 
@@ -124,39 +118,21 @@ static void * workerLoop(UA_Worker *worker) {
     return NULL;
 }
 
-/** Dispatch jobs to workers. Slices the job array up if it contains more than
-    BATCHSIZE items. The jobs array is freed in the worker threads. */
-static void dispatchJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
-    size_t startIndex = jobsSize; // start at the end
-    while(jobsSize > 0) {
-        size_t size = BATCHSIZE;
-        if(size > jobsSize)
-            size = jobsSize;
-        startIndex = startIndex - size;
-        struct DispatchJobsList *wln = UA_malloc(sizeof(struct DispatchJobsList));
-        if(startIndex > 0) {
-            wln->jobs = UA_malloc(size * sizeof(UA_Job));
-            memcpy(wln->jobs, &jobs[startIndex], size * sizeof(UA_Job));
-            wln->jobsSize = size;
-        } else {
-            /* forward the original array */
-            wln->jobsSize = size;
-            wln->jobs = jobs;
-        }
-        cds_wfcq_node_init(&wln->node);
-        cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
-        jobsSize -= size;
-    }
+static void
+dispatchJob(UA_Server *server, const UA_Job *job) {
+    struct DispatchJob *dj = UA_malloc(sizeof(struct DispatchJob));
+    dj->job = *job;
+    cds_wfcq_node_init(&dj->node);
+    cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &dj->node);
 }
 
 static void
 emptyDispatchQueue(UA_Server *server) {
     while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
-        struct DispatchJobsList *wln = (struct DispatchJobsList*)
+        struct DispatchJob *dj = (struct DispatchJob*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
-        processJobs(server, wln->jobs, wln->jobsSize);
-        UA_free(wln->jobs);
-        UA_free(wln);
+        processJob(server, &dj->job);
+        UA_free(dj);
     }
 }
 
@@ -166,183 +142,110 @@ emptyDispatchQueue(UA_Server *server) {
 /* Repeated Jobs */
 /*****************/
 
-struct IdentifiedJob {
-    UA_Job job;
-    UA_Guid id;
-};
-
-/**
- * The RepeatedJobs structure contains an array of jobs that are either executed with the same
- * repetition interval. The linked list is sorted, so we can stop traversing when the first element
- * has nextTime > now.
- */
-struct RepeatedJobs {
-    LIST_ENTRY(RepeatedJobs) pointers; ///> Links to the next list of repeated jobs (with a different) interval
-    UA_DateTime nextTime; ///> The next time when the jobs are to be executed
-    UA_UInt32 interval; ///> Interval in 100ns resolution
-    size_t jobsSize; ///> Number of jobs contained
-    struct IdentifiedJob jobs[]; ///> The jobs. This is not a pointer, instead the struct is variable sized.
-};
-
-/* throwaway struct for the mainloop callback */
-struct AddRepeatedJob {
-    struct IdentifiedJob job;
-    UA_UInt32 interval;
+/* The linked list of jobs is sorted according to the next execution timestamp */
+struct RepeatedJob {
+    LIST_ENTRY(RepeatedJob) next;  /* Next element in the list */
+    UA_DateTime nextTime;          /* The next time when the jobs are to be executed */
+    UA_UInt32 interval;            /* Interval in 100ns resolution */
+    UA_Guid id;                    /* Id of the repeated job */
+    UA_Job job;                    /* The job description itself */
 };
 
 /* internal. call only from the main loop. */
-static UA_StatusCode addRepeatedJob(UA_Server *server, struct AddRepeatedJob * UA_RESTRICT arw) {
-    struct RepeatedJobs *matchingTw = NULL; // add the item here
-    struct RepeatedJobs *lastTw = NULL; // if there is no repeated job, add a new one this entry
-    struct RepeatedJobs *tempTw;
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
-
+static void
+addRepeatedJob(UA_Server *server, struct RepeatedJob * UA_RESTRICT rj)
+{
     /* search for matching entry */
-    UA_DateTime firstTime = UA_DateTime_nowMonotonic() + arw->interval;
-    tempTw = LIST_FIRST(&server->repeatedJobs);
-    while(tempTw) {
-        if(arw->interval == tempTw->interval) {
-            matchingTw = tempTw;
-            break;
-        }
-        if(tempTw->nextTime > firstTime)
+    struct RepeatedJob *lastRj = NULL; /* Add after this entry or at LIST_HEAD if NULL */
+    struct RepeatedJob *tempRj = LIST_FIRST(&server->repeatedJobs);
+    while(tempRj) {
+        if(tempRj->nextTime > rj->nextTime)
             break;
-        lastTw = tempTw;
-        tempTw = LIST_NEXT(lastTw, pointers);
-    }
-
-    if(matchingTw) {
-        /* append to matching entry */
-        matchingTw = UA_realloc(matchingTw, sizeof(struct RepeatedJobs) +
-                                (sizeof(struct IdentifiedJob) * (matchingTw->jobsSize + 1)));
-        if(!matchingTw) {
-            retval = UA_STATUSCODE_BADOUTOFMEMORY;
-            goto cleanup;
-        }
-        /* link the reallocated tw into the list */
-        LIST_REPLACE(matchingTw, matchingTw, pointers);
-    } else {
-        /* create a new entry */
-        matchingTw = UA_malloc(sizeof(struct RepeatedJobs) + sizeof(struct IdentifiedJob));
-        if(!matchingTw) {
-            retval = UA_STATUSCODE_BADOUTOFMEMORY;
-            goto cleanup;
-        }
-        matchingTw->jobsSize = 0;
-        matchingTw->nextTime = firstTime;
-        matchingTw->interval = arw->interval;
-        if(lastTw)
-            LIST_INSERT_AFTER(lastTw, matchingTw, pointers);
-        else
-            LIST_INSERT_HEAD(&server->repeatedJobs, matchingTw, pointers);
+        lastRj = tempRj;
+        tempRj = LIST_NEXT(lastRj, next);
     }
-    matchingTw->jobs[matchingTw->jobsSize] = arw->job;
-    matchingTw->jobsSize++;
 
- cleanup:
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_free(arw);
-#endif
-    return retval;
+    /* add the repeated job */
+    if(lastRj)
+        LIST_INSERT_AFTER(lastRj, rj, next);
+    else
+        LIST_INSERT_HEAD(&server->repeatedJobs, rj, next);
 }
 
-UA_StatusCode UA_Server_addRepeatedJob(UA_Server *server, UA_Job job, UA_UInt32 interval, UA_Guid *jobId) {
+UA_StatusCode
+UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
+                         UA_UInt32 interval, UA_Guid *jobId) {
     /* the interval needs to be at least 5ms */
     if(interval < 5)
         return UA_STATUSCODE_BADINTERNALERROR;
     interval *= (UA_UInt32)UA_MSEC_TO_DATETIME; // from ms to 100ns resolution
 
-#ifdef UA_ENABLE_MULTITHREADING
-    struct AddRepeatedJob *arw = UA_malloc(sizeof(struct AddRepeatedJob));
-    if(!arw)
+    /* Create and fill the repeated job structure */
+    struct RepeatedJob *rj = UA_malloc(sizeof(struct RepeatedJob));
+    if(!rj)
         return UA_STATUSCODE_BADOUTOFMEMORY;
+    rj->nextTime = UA_DateTime_nowMonotonic() + interval;
+    rj->interval = interval;
+    rj->id = UA_Guid_random();
+    rj->job = job;
 
-    arw->interval = interval;
-    arw->job.job = job;
-    if(jobId) {
-        arw->job.id = UA_Guid_random();
-        *jobId = arw->job.id;
-    } else
-        UA_Guid_init(&arw->job.id);
-
+#ifdef UA_ENABLE_MULTITHREADING
+    /* Call addRepeatedJob from the main loop */
     struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
     if(!mlw) {
-        UA_free(arw);
+        UA_free(rj);
         return UA_STATUSCODE_BADOUTOFMEMORY;
     }
     mlw->job = (UA_Job) {
         .type = UA_JOBTYPE_METHODCALL,
-        .job.methodCall = {.data = arw, .method = (void (*)(UA_Server*, void*))addRepeatedJob}};
+        .job.methodCall = {.data = rj, .method = (void (*)(UA_Server*, void*))addRepeatedJob}};
     cds_lfs_push(&server->mainLoopJobs, &mlw->node);
 #else
-    struct AddRepeatedJob arw;
-    arw.interval = interval;
-    arw.job.job = job;
-    if(jobId) {
-        arw.job.id = UA_Guid_random();
-        *jobId = arw.job.id;
-    } else
-        UA_Guid_init(&arw.job.id);
-    addRepeatedJob(server, &arw);
+    /* Add directly */
+    addRepeatedJob(server, rj);
 #endif
+    if(jobId)
+        *jobId = rj->id;
     return UA_STATUSCODE_GOOD;
 }
 
 /* Returns the next datetime when a repeated job is scheduled */
-static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
-    struct RepeatedJobs *tw, *tmp_tw;
+static UA_DateTime
+processRepeatedJobs(UA_Server *server, UA_DateTime current) {
+    struct RepeatedJob *rj, *tmp_rj;
     /* Iterate over the list of elements (sorted according to the next execution timestamp) */
-    LIST_FOREACH_SAFE(tw, &server->repeatedJobs, pointers, tmp_tw) {
-        if(tw->nextTime > current)
+    LIST_FOREACH_SAFE(rj, &server->repeatedJobs, next, tmp_rj) {
+        if(rj->nextTime > current)
             break;
 
+        /* Dispatch/process job */
 #ifdef UA_ENABLE_MULTITHREADING
-        // copy the entry and insert at the new location
-        UA_Job *jobsCopy = UA_malloc(sizeof(UA_Job) * tw->jobsSize);
-        if(!jobsCopy) {
-            UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
-                         "Not enough memory to dispatch delayed jobs");
-            break;
-        }
-        for(size_t i=0;i<tw->jobsSize;i++)
-            jobsCopy[i] = tw->jobs[i].job;
-        dispatchJobs(server, jobsCopy, tw->jobsSize); // frees the job pointer
+        dispatchJob(server, &rj->job);
 #else
-        size_t size = tw->jobsSize;
-        for(size_t i = 0; i < size; i++)
-            processJobs(server, &tw->jobs[i].job, 1); // does not free the job ptr
+        processJob(server, &rj->job);
 #endif
 
-        /* Elements are removed only here. Check if empty. */
-        if(tw->jobsSize == 0) {
-            LIST_REMOVE(tw, pointers);
-            UA_free(tw);
-            UA_assert(LIST_FIRST(&server->repeatedJobs) != tw); /* Assert for static code checkers */
-            continue;
-        }
-
         /* Set the time for the next execution */
-        tw->nextTime += tw->interval;
-        if(tw->nextTime < current)
-            tw->nextTime = current;
+        rj->nextTime += rj->interval;
+        if(rj->nextTime < current)
+            rj->nextTime = current;
 
-        /* Reinsert to keep the list sorted */
-        struct RepeatedJobs *prevTw = LIST_FIRST(&server->repeatedJobs);
+        /* Keep the list sorted */
+        struct RepeatedJob *prev_rj = LIST_FIRST(&server->repeatedJobs);
         while(true) {
-            struct RepeatedJobs *n = LIST_NEXT(prevTw, pointers);
-            if(!n || n->nextTime > tw->nextTime)
+            struct RepeatedJob *n = LIST_NEXT(prev_rj, next);
+            if(!n || n->nextTime > rj->nextTime)
                 break;
-            prevTw = n;
+            prev_rj = n;
         }
-        if(prevTw != tw) {
-            LIST_REMOVE(tw, pointers);
-            LIST_INSERT_AFTER(prevTw, tw, pointers);
+        if(prev_rj != rj) {
+            LIST_REMOVE(rj, next);
+            LIST_INSERT_AFTER(prev_rj, rj, next);
         }
     }
 
-    // check if the next repeated job is sooner than the usual timeout
-    // calc in 32 bit must be ok
-    struct RepeatedJobs *first = LIST_FIRST(&server->repeatedJobs);
+    /* Check if the next repeated job is sooner than the usual timeout */
+    struct RepeatedJob *first = LIST_FIRST(&server->repeatedJobs);
     UA_DateTime next = current + (MAXTIMEOUT * UA_MSEC_TO_DATETIME);
     if(first && first->nextTime < next)
         next = first->nextTime;
@@ -350,23 +253,19 @@ static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
 }
 
 /* Call this function only from the main loop! */
-static void removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
-    struct RepeatedJobs *tw;
-    LIST_FOREACH(tw, &server->repeatedJobs, pointers) {
-        for(size_t i = 0; i < tw->jobsSize; i++) {
-            if(!UA_Guid_equal(jobId, &tw->jobs[i].id))
-                continue;
-            tw->jobsSize--; /* if size == 0, tw is freed during the next processing */
-            if(tw->jobsSize > 0)
-                tw->jobs[i] = tw->jobs[tw->jobsSize]; // move the last entry to overwrite
-            goto finish;
-        }
+static void
+removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
+    struct RepeatedJob *rj;
+    LIST_FOREACH(rj, &server->repeatedJobs, next) {
+        if(!UA_Guid_equal(jobId, &rj->id))
+            continue;
+        LIST_REMOVE(rj, next);
+        UA_free(rj);
+        break;
     }
- finish:
 #ifdef UA_ENABLE_MULTITHREADING
     UA_free(jobId);
 #endif
-    return;
 }
 
 UA_StatusCode UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
@@ -388,9 +287,9 @@ UA_StatusCode UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
 }
 
 void UA_Server_deleteAllRepeatedJobs(UA_Server *server) {
-    struct RepeatedJobs *current, *temp;
-    LIST_FOREACH_SAFE(current, &server->repeatedJobs, pointers, temp) {
-        LIST_REMOVE(current, pointers);
+    struct RepeatedJob *current, *temp;
+    LIST_FOREACH_SAFE(current, &server->repeatedJobs, next, temp) {
+        LIST_REMOVE(current, next);
         UA_free(current);
     }
 }
@@ -418,9 +317,9 @@ static void getCounters(UA_Server *server, struct DelayedJobs *delayed) {
     delayed->workerCounters = counters;
 }
 
-// Call from the main thread only. This is the only function that modifies
-// server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
-// head).
+/* Call from the main thread only. This is the only function that modifies */
+/* server->delayedWork. processDelayedWorkQueue modifies the "next" (after the */
+/* head). */
 static void addDelayedJob(UA_Server *server, UA_Job *job) {
     struct DelayedJobs *dj = server->delayedJobs;
     if(!dj || dj->jobsCount >= DELAYEDJOBSSIZE) {
@@ -438,37 +337,29 @@ static void addDelayedJob(UA_Server *server, UA_Job *job) {
 
         /* dispatch a method that sets the counter for the full list that comes afterwards */
         if(dj->next) {
-            UA_Job *setCounter = UA_malloc(sizeof(UA_Job));
-            *setCounter = (UA_Job) {.type = UA_JOBTYPE_METHODCALL, .job.methodCall =
-                                    {.method = (void (*)(UA_Server*, void*))getCounters, .data = dj->next}};
-            dispatchJobs(server, setCounter, 1);
+            UA_Job setCounter = (UA_Job){
+                .type = UA_JOBTYPE_METHODCALL, .job.methodCall =
+                {.method = (void (*)(UA_Server*, void*))getCounters, .data = dj->next}};
+            dispatchJob(server, &setCounter);
         }
     }
     dj->jobs[dj->jobsCount] = *job;
     dj->jobsCount++;
 }
 
-static void addDelayedJobAsync(UA_Server *server, UA_Job *job) {
-    addDelayedJob(server, job);
-    UA_free(job);
-}
-
-static void server_free(UA_Server *server, void *data) {
+static void
+delayed_free(UA_Server *server, void *data) {
     UA_free(data);
 }
 
 UA_StatusCode UA_Server_delayedFree(UA_Server *server, void *data) {
-    UA_Job *j = UA_malloc(sizeof(UA_Job));
-    if(!j)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    j->type = UA_JOBTYPE_METHODCALL;
-    j->job.methodCall.data = data;
-    j->job.methodCall.method = server_free;
-    struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
-    mlw->job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL, .job.methodCall =
-                         {.data = j, .method = (UA_ServerCallback)addDelayedJobAsync}};
-    cds_lfs_push(&server->mainLoopJobs, &mlw->node);
-    return UA_STATUSCODE_GOOD;
+    return UA_Server_delayedCallback(server, delayed_free, data);
+}
+
+static void
+addDelayedJobAsync(UA_Server *server, UA_Job *job) {
+    addDelayedJob(server, job);
+    UA_free(job);
 }
 
 UA_StatusCode
@@ -522,10 +413,11 @@ dispatchDelayedJobs(UA_Server *server, void *_) {
 #endif
     /* process and free all delayed jobs from here on */
     while(dw) {
-        processJobs(server, dw->jobs, dw->jobsCount);
+        for(size_t i = 0; i < dw->jobsCount; i++)
+            processJob(server, &dw->jobs[i]);
         struct DelayedJobs *next = uatomic_xchg(&beforedw->next, NULL);
-        UA_free(dw);
         UA_free(dw->workerCounters);
+        UA_free(dw);
         dw = next;
     }
 #if (__GNUC__ <= 4 && __GNUC_MINOR__ <= 6)
@@ -549,12 +441,11 @@ static void processMainLoopJobs(UA_Server *server) {
     struct MainLoopJob *mlw = (struct MainLoopJob*)&head->node;
     struct MainLoopJob *next;
     do {
-        processJobs(server, &mlw->job, 1);
+        processJob(server, &mlw->job);
         next = (struct MainLoopJob*)mlw->node.next;
         UA_free(mlw);
         //cppcheck-suppress unreadVariable
     } while((mlw = next));
-    //UA_free(head);
 }
 #endif
 
@@ -591,7 +482,10 @@ UA_StatusCode UA_Server_run_startup(UA_Server *server) {
     return result;
 }
 
-static void completeMessages(UA_Server *server, UA_Job *job) {
+/* completeMessages is run synchronous on the jobs returned from the network
+   layer, so that the order for processing TCP packets is never mixed up. */
+static void
+completeMessages(UA_Server *server, UA_Job *job) {
     UA_Boolean realloced = UA_FALSE;
     UA_StatusCode retval = UA_Connection_completeMessages(job->job.binaryMessage.connection,
                                                           &job->job.binaryMessage.message, &realloced);
@@ -653,16 +547,22 @@ UA_UInt16 UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
                 completeMessages(server, &jobs[k]);
         }
 
+        /* Dispatch/process jobs */
+        for(size_t j = 0; j < jobsSize; j++) {
 #ifdef UA_ENABLE_MULTITHREADING
-        dispatchJobs(server, jobs, jobsSize);
-        /* Wake up worker threads */
-        if(jobsSize > 0)
-            pthread_cond_broadcast(&server->dispatchQueue_condition);
+            dispatchJob(server, &jobs[j]);
 #else
-        processJobs(server, jobs, jobsSize);
-        if(jobsSize > 0)
-            UA_free(jobs);
+            processJob(server, &jobs[j]);
 #endif
+        }
+
+        if(jobsSize > 0) {
+#ifdef UA_ENABLE_MULTITHREADING
+            /* Wake up worker threads */
+            pthread_cond_broadcast(&server->dispatchQueue_condition);
+#endif
+            UA_free(jobs);
+        }
     }
 
     now = UA_DateTime_nowMonotonic();
@@ -677,7 +577,8 @@ UA_StatusCode UA_Server_run_shutdown(UA_Server *server) {
         UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
         UA_Job *stopJobs;
         size_t stopJobsSize = nl->stop(nl, &stopJobs);
-        processJobs(server, stopJobs, stopJobsSize);
+        for(size_t j = 0; j < stopJobsSize; j++)
+            processJob(server, &stopJobs[j]);
         UA_free(stopJobs);
     }
 

+ 1 - 1
src/server/ua_services_session.c

@@ -44,7 +44,7 @@ void Service_CreateSession(UA_Server *server, UA_SecureChannel *channel,
          return;
     }
     UA_LOG_DEBUG_CHANNEL(server->config.logger, channel, "Session " UA_PRINTF_GUID_FORMAT " created",
-                         UA_PRINTF_GUID_DATA(newSession->sessionId));
+                         UA_PRINTF_GUID_DATA(newSession->sessionId.identifier.guid));
 }
 
 void

+ 62 - 20
src/server/ua_services_subscription.c

@@ -15,8 +15,16 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
                         UA_Double requestedPublishingInterval,
                         UA_UInt32 requestedLifetimeCount,
                         UA_UInt32 requestedMaxKeepAliveCount,
-                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
-    Subscription_unregisterPublishJob(server, subscription);
+                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority)
+{
+    /* deregister the job if required */
+    UA_StatusCode retval = Subscription_unregisterPublishJob(server, subscription);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG_SESSION(server->config.logger, subscription->session, "Subscription %u | "
+                             "Could not unregister publish job with error code 0x%08x",
+                             subscription->subscriptionID, retval);
+
+    /* re-parameterize the subscription */
     subscription->publishingInterval = requestedPublishingInterval;
     UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
                                requestedPublishingInterval, subscription->publishingInterval);
@@ -34,30 +42,45 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
        maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
         subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
     subscription->priority = priority;
-    Subscription_registerPublishJob(server, subscription);
+
+    retval = Subscription_registerPublishJob(server, subscription);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG_SESSION(server->config.logger, subscription->session, "Subscription %u | "
+                             "Could not register publish job with error code 0x%08x",
+                             subscription->subscriptionID, retval);
 }
 
-void Service_CreateSubscription(UA_Server *server, UA_Session *session,
-                                const UA_CreateSubscriptionRequest *request,
-                                UA_CreateSubscriptionResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing CreateSubscriptionRequest");
-    response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
+void
+Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                           const UA_CreateSubscriptionRequest *request,
+                           UA_CreateSubscriptionResponse *response)
+{
+    /* Create the subscription */
     UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
+        UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing CreateSubscriptionRequest failed");
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-
+    newSubscription->subscriptionID = UA_Session_getUniqueSubscriptionID(session);
     UA_Session_addSubscription(session, newSubscription);
+
+    /* Set the subscription parameters */
     newSubscription->publishingEnabled = request->publishingEnabled;
+    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
     setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
                             request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                             request->maxNotificationsPerPublish, request->priority);
-    /* immediately send the first response */
-    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
+
+    /* Prepare the response */
+    response->subscriptionId = newSubscription->subscriptionID;
     response->revisedPublishingInterval = newSubscription->publishingInterval;
     response->revisedLifetimeCount = newSubscription->lifeTimeCount;
     response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
+
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "CreateSubscriptionRequest: Created Subscription %u "
+                         "with a publishing interval of %f ms", response->subscriptionId,
+                         newSubscription->publishingInterval);
 }
 
 void Service_ModifySubscription(UA_Server *server, UA_Session *session,
@@ -103,8 +126,14 @@ void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
             response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             continue;
         }
-        sub->publishingEnabled = request->publishingEnabled;
-        sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+        if(sub->publishingEnabled != request->publishingEnabled) {
+            sub->publishingEnabled = request->publishingEnabled;
+            sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+            if(sub->publishingEnabled)
+                Subscription_registerPublishJob(server, sub);
+            else
+                Subscription_unregisterPublishJob(server, sub);
+        }
     }
 }
 
@@ -290,7 +319,8 @@ void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToModifySize;
 
     for(size_t i = 0; i < request->itemsToModifySize; i++)
-        Service_ModifyMonitoredItems_single(server, session, sub, &request->itemsToModify[i], &response->results[i]);
+        Service_ModifyMonitoredItems_single(server, session, sub, &request->itemsToModify[i],
+                                            &response->results[i]);
 
 }
 
@@ -407,7 +437,8 @@ Service_Publish(UA_Server *server, UA_Session *session,
     UA_Subscription *immediate;
     LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
         if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
-            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Response on a late subscription",
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
+                                 "Response on a late subscription", immediate->subscriptionID,
                                  session->authenticationToken.identifier.numeric);
             UA_Subscription_publishCallback(server, immediate);
             return;
@@ -415,9 +446,10 @@ Service_Publish(UA_Server *server, UA_Session *session,
     }
 }
 
-void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
-                                 const UA_DeleteSubscriptionsRequest *request,
-                                 UA_DeleteSubscriptionsResponse *response) {
+void
+Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
+                            const UA_DeleteSubscriptionsRequest *request,
+                            UA_DeleteSubscriptionsResponse *response) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteSubscriptionsRequest");
 
     if(request->subscriptionIdsSize == 0){
@@ -432,14 +464,24 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     }
     response->resultsSize = request->subscriptionIdsSize;
 
-    for(size_t i = 0; i < request->subscriptionIdsSize; i++)
+    for(size_t i = 0; i < request->subscriptionIdsSize; i++) {
         response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
+        if(response->results[i] == UA_STATUSCODE_GOOD) {
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
+                                "Subscription deleted", request->subscriptionIds[i]);
+        } else {
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Deleting Subscription with Id "
+                                 "%u failed with error code 0x%08x", request->subscriptionIds[i],
+                                 response->results[i]);
+        }
+    }
 }
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_DeleteMonitoredItemsRequest *request,
                                   UA_DeleteMonitoredItemsResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteMonitoredItemsRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session,
+                         "Processing DeleteMonitoredItemsRequest");
 
     if(request->monitoredItemIdsSize == 0) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;

+ 2 - 2
src/server/ua_session_manager.c

@@ -46,7 +46,7 @@ UA_SessionManager_getSession(UA_SessionManager *sm, const UA_NodeId *token) {
             if(UA_DateTime_nowMonotonic() > current->session.validTill) {
                 UA_LOG_DEBUG(sm->server->config.logger, UA_LOGCATEGORY_SESSION,
                              "Try to use Session with token " UA_PRINTF_GUID_FORMAT ", but has timed out",
-                             UA_PRINTF_GUID_DATA((*token)));
+                             UA_PRINTF_GUID_DATA(token->identifier.guid));
                 return NULL;
             }
             return &current->session;
@@ -54,7 +54,7 @@ UA_SessionManager_getSession(UA_SessionManager *sm, const UA_NodeId *token) {
     }
     UA_LOG_DEBUG(sm->server->config.logger, UA_LOGCATEGORY_SESSION,
                  "Try to use Session with token " UA_PRINTF_GUID_FORMAT " but is not found",
-                 UA_PRINTF_GUID_DATA((*token)));
+                 UA_PRINTF_GUID_DATA(token->identifier.guid));
     return NULL;
 }
 

+ 16 - 5
src/server/ua_subscription.c

@@ -55,7 +55,8 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
     if(!newvalue) {
         UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %i | "
-                               "Skipped a sample due to lack of memory", sub->subscriptionID, monitoredItem->itemId);
+                               "Skipped a sample due to lack of memory", sub->subscriptionID,
+                               monitoredItem->itemId);
         return;
     }
     UA_DataValue_init(&newvalue->value);
@@ -222,6 +223,9 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
 }
 
 void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                         "Publish Callback", sub->subscriptionID);
+
     /* Count the available notifications */
     size_t notifications = 0;
     UA_Boolean moreNotifications = false;
@@ -262,8 +266,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         } else {
             sub->currentLifetimeCount++;
             if(sub->currentLifetimeCount > sub->lifeTimeCount) {
-                UA_LOG_INFO_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                                    "End of lifetime for subscription", sub->subscriptionID);
+                UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                                     "End of lifetime for subscription", sub->subscriptionID);
                 UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
             }
         }
@@ -316,7 +320,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                 mon_l++;
             }
             UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %u | " \
-                                 "Adding %u notifications to the publish response. %u notifications remain in the queue",
+                                 "Adding %u notifications to the publish response. " \
+                                 "%u notifications remain in the queue",
                                  sub->subscriptionID, mon->itemId, mon_l, mon->currentQueueSize);
             l += mon_l;
         }
@@ -352,7 +357,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Send the response */
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Sending out a publish response with %u notifications", (UA_UInt32)notifications);
+                         "Subscription %u | Sending out a publish response with %u notifications",
+                         sub->subscriptionID, (UA_UInt32)notifications);
     UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
                                        &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
@@ -368,6 +374,11 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 }
 
 UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
+    if(sub->publishJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    if(!sub->publishingEnabled)
+        return UA_STATUSCODE_GOOD;
+
     UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
                            .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback,
                                               .data = sub} };

+ 5 - 2
src/ua_session.c

@@ -70,7 +70,8 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
 }
 
 void UA_Session_updateLifetime(UA_Session *session) {
-    session->validTill = UA_DateTime_nowMonotonic() + (UA_DateTime)(session->timeout * UA_MSEC_TO_DATETIME);
+    session->validTill = UA_DateTime_nowMonotonic() +
+        (UA_DateTime)(session->timeout * UA_MSEC_TO_DATETIME);
 }
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
@@ -80,7 +81,8 @@ void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscri
 }
 
 UA_StatusCode
-UA_Session_deleteSubscription(UA_Server *server, UA_Session *session, UA_UInt32 subscriptionID) {
+UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
+                              UA_UInt32 subscriptionID) {
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
@@ -116,6 +118,7 @@ void UA_Session_answerPublishRequestsWithoutSubscription(UA_Session *session) {
         UA_PublishResponse *response = &pre->response;
         UA_UInt32 requestId = pre->requestId;
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
+        response->responseHeader.timestamp = UA_DateTime_now();
         UA_SecureChannel_sendBinaryMessage(session->channel, requestId, response,
                                            &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
         UA_free(pre);

+ 6 - 6
src/ua_session.h

@@ -79,42 +79,42 @@ void UA_Session_answerPublishRequestsWithoutSubscription(UA_Session *session);
     UA_LOG_TRACE(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                  (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                  (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                 UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                 UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                  ##__VA_ARGS__);
 
 #define UA_LOG_DEBUG_SESSION(LOGGER, SESSION, MSG, ...)                 \
     UA_LOG_DEBUG(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                  (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                  (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                 UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                 UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                  ##__VA_ARGS__);
 
 #define UA_LOG_INFO_SESSION(LOGGER, SESSION, MSG, ...)                  \
     UA_LOG_INFO(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                  (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                  (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                 UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                 UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                  ##__VA_ARGS__);
 
 #define UA_LOG_WARNING_SESSION(LOGGER, SESSION, MSG, ...)               \
     UA_LOG_WARNING(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                    (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                    (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                   UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                   UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                    ##__VA_ARGS__);
 
 #define UA_LOG_ERROR_SESSION(LOGGER, SESSION, MSG, ...)                 \
     UA_LOG_ERROR(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                  (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                  (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                 UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                 UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                  ##__VA_ARGS__);
 
 #define UA_LOG_FATAL_SESSION(LOGGER, SESSION, MSG, ...)                 \
     UA_LOG_FATAL(LOGGER, UA_LOGCATEGORY_SESSION, "Connection %i | SecureChannel %i | Session " UA_PRINTF_GUID_FORMAT " | " MSG, \
                  (SESSION->channel ? (SESSION->channel->connection ? SESSION->channel->connection->sockfd : 0) : 0), \
                  (SESSION->channel ? SESSION->channel->securityToken.channelId : 0), \
-                 UA_PRINTF_GUID_DATA(SESSION->sessionId), \
+                 UA_PRINTF_GUID_DATA(SESSION->sessionId.identifier.guid), \
                  ##__VA_ARGS__);
 
 #endif /* UA_SESSION_H_ */

+ 4 - 0
tests/CMakeLists.txt

@@ -52,6 +52,10 @@ add_executable(check_services_nodemanagement check_services_nodemanagement.c $<T
 target_link_libraries(check_services_nodemanagement ${LIBS})
 add_test(services_nodemanagement ${CMAKE_CURRENT_BINARY_DIR}/check_services_nodemanagement)
 
+add_executable(check_services_subscriptions check_services_subscriptions.c $<TARGET_OBJECTS:open62541-object>)
+target_link_libraries(check_services_subscriptions ${LIBS})
+add_test(check_services_subscriptions ${CMAKE_CURRENT_BINARY_DIR}/check_services_subscriptions)
+
 add_executable(check_nodestore check_nodestore.c $<TARGET_OBJECTS:open62541-object>)
 target_link_libraries(check_nodestore ${LIBS})
 add_test(nodestore ${CMAKE_CURRENT_BINARY_DIR}/check_nodestore)

+ 128 - 0
tests/check_services_subscriptions.c

@@ -0,0 +1,128 @@
+#include "ua_server.h"
+#include "server/ua_services.h"
+#include "server/ua_server_internal.h"
+#include "server/ua_subscription.h"
+#include "ua_config_standard.h"
+
+#include "check.h"
+#include <unistd.h>
+
+UA_Server *server = NULL;
+
+static void setup(void) {
+    server = UA_Server_new(UA_ServerConfig_standard);
+    UA_Server_run_startup(server);
+}
+
+static void teardown(void) {
+    UA_Server_run_shutdown(server);
+    UA_Server_delete(server);
+}
+
+START_TEST(Server_createSubscription) {
+    /* Create a subscription */
+    UA_CreateSubscriptionRequest request;
+    UA_CreateSubscriptionRequest_init(&request);
+    request.publishingEnabled = true;
+
+    UA_CreateSubscriptionResponse response;
+    UA_CreateSubscriptionResponse_init(&response);
+
+    Service_CreateSubscription(server, &adminSession, &request, &response);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    UA_UInt32 subscriptionId = response.subscriptionId;
+
+    UA_CreateSubscriptionResponse_deleteMembers(&response);
+
+    /* Remove the subscription */
+    UA_DeleteSubscriptionsRequest del_request;
+    UA_DeleteSubscriptionsRequest_init(&del_request);
+    del_request.subscriptionIdsSize = 1;
+    del_request.subscriptionIds = &subscriptionId;
+
+    UA_DeleteSubscriptionsResponse del_response;
+    UA_DeleteSubscriptionsResponse_init(&del_response);
+
+    Service_DeleteSubscriptions(server, &adminSession, &del_request, &del_response);
+    ck_assert_uint_eq(del_response.resultsSize, 1);
+    ck_assert_uint_eq(del_response.results[0], UA_STATUSCODE_GOOD);
+
+    UA_DeleteSubscriptionsResponse_deleteMembers(&del_response);
+}
+END_TEST
+
+START_TEST(Server_publishCallback) {
+    /* Create a subscription */
+    UA_CreateSubscriptionRequest request;
+    UA_CreateSubscriptionResponse response;
+
+    UA_CreateSubscriptionRequest_init(&request);
+    request.publishingEnabled = true;
+    UA_CreateSubscriptionResponse_init(&response);
+    Service_CreateSubscription(server, &adminSession, &request, &response);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    UA_UInt32 subscriptionId1 = response.subscriptionId;
+    UA_CreateSubscriptionResponse_deleteMembers(&response);
+
+    /* Create a second subscription */
+    UA_CreateSubscriptionRequest_init(&request);
+    request.publishingEnabled = true;
+    UA_CreateSubscriptionResponse_init(&response);
+    Service_CreateSubscription(server, &adminSession, &request, &response);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    UA_UInt32 subscriptionId2 = response.subscriptionId;
+    UA_Double publishingInterval = response.revisedPublishingInterval;
+    ck_assert(publishingInterval > 0.0f);
+    UA_CreateSubscriptionResponse_deleteMembers(&response);
+
+    /* Sleep until the publishing interval times out */
+    usleep((useconds_t)(publishingInterval * 1000) + 1000);
+
+
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
+        ck_assert_uint_eq(sub->currentKeepAliveCount, 0);
+
+    UA_Server_run_iterate(server, false);
+
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
+        ck_assert_uint_eq(sub->currentKeepAliveCount, 1);
+
+    /* Remove the subscriptions */
+    UA_DeleteSubscriptionsRequest del_request;
+    UA_DeleteSubscriptionsRequest_init(&del_request);
+    UA_UInt32 removeIds[2] = {subscriptionId1, subscriptionId2};
+    del_request.subscriptionIdsSize = 2;
+    del_request.subscriptionIds = removeIds;
+
+    UA_DeleteSubscriptionsResponse del_response;
+    UA_DeleteSubscriptionsResponse_init(&del_response);
+
+    Service_DeleteSubscriptions(server, &adminSession, &del_request, &del_response);
+    ck_assert_uint_eq(del_response.resultsSize, 2);
+    ck_assert_uint_eq(del_response.results[0], UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(del_response.results[1], UA_STATUSCODE_GOOD);
+
+    UA_DeleteSubscriptionsResponse_deleteMembers(&del_response);
+}
+END_TEST
+
+static Suite* testSuite_Client(void) {
+    Suite *s = suite_create("Server Subscription");
+    TCase *tc_server = tcase_create("Server Subscription Basic");
+    tcase_add_checked_fixture(tc_server, setup, teardown);
+    tcase_add_test(tc_server, Server_createSubscription);
+    tcase_add_test(tc_server, Server_publishCallback);
+    suite_add_tcase(s, tc_server);
+    return s;
+}
+
+int main(void) {
+    Suite *s = testSuite_Client();
+    SRunner *sr = srunner_create(s);
+    srunner_set_fork_status(sr, CK_NOFORK);
+    srunner_run_all(sr,CK_NORMAL);
+    int number_failed = srunner_ntests_failed(sr);
+    srunner_free(sr);
+    return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}