|
@@ -1,4 +1,9 @@
|
|
|
#include <stdio.h>
|
|
|
+#define __USE_POSIX
|
|
|
+#define _XOPEN_SOURCE 500
|
|
|
+#define __USE_POSIX199309
|
|
|
+#include <sys/time.h>
|
|
|
+#include <time.h>
|
|
|
#include "ua_server_internal.h"
|
|
|
|
|
|
/**
|
|
@@ -14,7 +19,7 @@
|
|
|
* all previous work has actually finished (only for multithreading)
|
|
|
*/
|
|
|
|
|
|
-#define MAXTIMEOUT 5000 // max timeout in usec until the next main loop iteration
|
|
|
+#define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
|
|
|
#define BATCHSIZE 20 // max size of worklists that are dispatched to workers
|
|
|
|
|
|
static void processWork(UA_Server *server, const UA_WorkItem *work, UA_Int32 workSize) {
|
|
@@ -93,6 +98,11 @@ static void * workerLoop(struct workerStartData *startInfo) {
|
|
|
UA_Server *server = startInfo->server;
|
|
|
UA_free(startInfo);
|
|
|
|
|
|
+ pthread_mutex_t mutex; // required for the condition variable
|
|
|
+ pthread_mutex_init(&mutex,0);
|
|
|
+ pthread_mutex_lock(&mutex);
|
|
|
+ struct timespec to;
|
|
|
+
|
|
|
while(*server->running) {
|
|
|
struct workListNode *wln = (struct workListNode*)
|
|
|
cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
|
|
@@ -100,9 +110,15 @@ static void * workerLoop(struct workerStartData *startInfo) {
|
|
|
processWork(server, wln->work, wln->workSize);
|
|
|
UA_free(wln->work);
|
|
|
UA_free(wln);
|
|
|
+ } else {
|
|
|
+ clock_gettime(CLOCK_REALTIME, &to);
|
|
|
+ to.tv_sec += 2;
|
|
|
+ pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
|
|
|
}
|
|
|
uatomic_inc(c); // increase the workerCounter;
|
|
|
}
|
|
|
+ pthread_mutex_unlock(&mutex);
|
|
|
+ pthread_mutex_destroy(&mutex);
|
|
|
rcu_unregister_thread();
|
|
|
return UA_NULL;
|
|
|
}
|
|
@@ -184,9 +200,9 @@ static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA
|
|
|
}
|
|
|
|
|
|
// Currently, these functions need to get the server mutex, but should be sufficiently fast
|
|
|
-UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime time,
|
|
|
+UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
|
|
|
UA_Guid *resultWorkGuid) {
|
|
|
- return addTimedWork(server, work, time, 0, resultWorkGuid);
|
|
|
+ return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
|
|
|
}
|
|
|
|
|
|
UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
|
|
@@ -392,6 +408,7 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
|
|
|
// 1) Prepare the threads
|
|
|
server->running = running; // the threads need to access the variable
|
|
|
server->nThreads = nThreads;
|
|
|
+ pthread_cond_init(&server->dispatchQueue_condition, 0);
|
|
|
pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
|
|
|
server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
|
|
|
for(UA_UInt32 i=0;i<nThreads;i++) {
|
|
@@ -439,13 +456,14 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
|
|
|
work[k].type = UA_WORKITEMTYPE_NOTHING;
|
|
|
}
|
|
|
dispatchWork(server, workSize, work);
|
|
|
+ if(workSize > 0)
|
|
|
+ pthread_cond_broadcast(&server->dispatchQueue_condition);
|
|
|
#else
|
|
|
processWork(server, work, workSize);
|
|
|
UA_free(work);
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
-
|
|
|
// 3.3) Exit?
|
|
|
if(!*running)
|
|
|
break;
|