|
@@ -6,10 +6,8 @@
|
|
|
*
|
|
|
* 1. Ordinary WorkItems (that are dispatched to worker threads if
|
|
|
* multithreading is activated)
|
|
|
- *
|
|
|
* 2. Timed work that is executed at a precise date (with an optional repetition
|
|
|
* interval)
|
|
|
- *
|
|
|
* 3. Delayed work that is executed at a later time when it is guaranteed that
|
|
|
* all previous work has actually finished (only for multithreading)
|
|
|
*/
|
|
@@ -21,18 +19,20 @@ static void processWork(UA_Server *server, UA_WorkItem *work, UA_Int32 workSize)
|
|
|
for(UA_Int32 i = 0; i < workSize; i++) {
|
|
|
UA_WorkItem *item = &work[i];
|
|
|
switch(item->type) {
|
|
|
- case UA_WORKITEMTYPE_BINARYNETWORKMESSAGE:
|
|
|
- UA_Server_processBinaryMessage(server, item->work.binaryNetworkMessage.connection,
|
|
|
- &item->work.binaryNetworkMessage.message);
|
|
|
- item->work.binaryNetworkMessage.connection->releaseBuffer(item->work.binaryNetworkMessage.connection,
|
|
|
- &item->work.binaryNetworkMessage.message);
|
|
|
+ case UA_WORKITEMTYPE_BINARYMESSAGE:
|
|
|
+ UA_Server_processBinaryMessage(server, item->work.binaryMessage.connection,
|
|
|
+ &item->work.binaryMessage.message);
|
|
|
+ item->work.binaryMessage.connection->releaseBuffer(item->work.binaryMessage.connection,
|
|
|
+ &item->work.binaryMessage.message);
|
|
|
+ break;
|
|
|
+ case UA_WORKITEMTYPE_CLOSECONNECTION:
|
|
|
+ UA_Connection_detachSecureChannel(item->work.closeConnection);
|
|
|
+ item->work.closeConnection->close(item->work.closeConnection);
|
|
|
break;
|
|
|
-
|
|
|
case UA_WORKITEMTYPE_METHODCALL:
|
|
|
case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
|
|
|
item->work.methodCall.method(server, item->work.methodCall.data);
|
|
|
break;
|
|
|
-
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
@@ -135,76 +135,67 @@ static void emptyDispatchQueue(UA_Server *server) {
|
|
|
/* Timed Work */
|
|
|
/**************/
|
|
|
|
|
|
-struct UA_TimedWork {
|
|
|
- LIST_ENTRY(UA_TimedWork) pointers;
|
|
|
- UA_UInt16 workSize;
|
|
|
- UA_WorkItem *work;
|
|
|
- UA_Guid *workIds;
|
|
|
- UA_DateTime time;
|
|
|
- UA_UInt32 repetitionInterval; // in 100ns resolution, 0 means no repetition
|
|
|
+struct TimedWork {
|
|
|
+ LIST_ENTRY(TimedWork) pointers;
|
|
|
+ UA_DateTime nextTime;
|
|
|
+ UA_UInt32 interval; ///> in ms resolution, 0 means no repetition
|
|
|
+ size_t workSize;
|
|
|
+ struct {
|
|
|
+ UA_WorkItem work;
|
|
|
+ UA_Guid workId;
|
|
|
+ } work[];
|
|
|
};
|
|
|
|
|
|
/* The item is copied and not freed by this function. */
|
|
|
static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
|
|
|
UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
|
|
|
- UA_TimedWork *tw, *lastTw = UA_NULL;
|
|
|
-
|
|
|
- // search for matching entry
|
|
|
- LIST_FOREACH(tw, &server->timedWork, pointers) {
|
|
|
- if(tw->repetitionInterval == repetitionInterval &&
|
|
|
- (repetitionInterval > 0 || tw->time == firstTime))
|
|
|
- break; // found a matching entry
|
|
|
- lastTw = tw;
|
|
|
- if(tw->time > firstTime) {
|
|
|
- tw = UA_NULL; // not matchin entry exists
|
|
|
- break;
|
|
|
+ struct TimedWork *lastTw = UA_NULL, *matchingTw = UA_NULL;
|
|
|
+
|
|
|
+ /* search for matching entry */
|
|
|
+ if(repetitionInterval == 0) {
|
|
|
+ LIST_FOREACH(lastTw, &server->timedWork, pointers) {
|
|
|
+ if(lastTw->nextTime == firstTime) {
|
|
|
+ if(lastTw->nextTime == firstTime)
|
|
|
+ matchingTw = lastTw;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LIST_FOREACH(matchingTw, &server->timedWork, pointers) {
|
|
|
+ if(repetitionInterval == matchingTw->interval)
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if(tw) {
|
|
|
- // append to matching entry
|
|
|
- tw->workSize++;
|
|
|
- UA_WorkItem *biggerWorkArray = UA_realloc(tw->work, sizeof(UA_WorkItem)*tw->workSize);
|
|
|
- if(!biggerWorkArray)
|
|
|
+ struct TimedWork *newWork;
|
|
|
+ if(matchingTw) {
|
|
|
+ /* append to matching entry */
|
|
|
+ newWork = UA_realloc(matchingTw, sizeof(struct TimedWork) + (sizeof(UA_WorkItem)*matchingTw->workSize + 1));
|
|
|
+ if(!newWork)
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
- tw->work = biggerWorkArray;
|
|
|
- UA_Guid *biggerWorkIds = UA_realloc(tw->workIds, sizeof(UA_Guid)*tw->workSize);
|
|
|
- if(!biggerWorkIds)
|
|
|
+ if(newWork->pointers.le_next)
|
|
|
+ newWork->pointers.le_next->pointers.le_prev = &newWork->pointers.le_next;
|
|
|
+ if(newWork->pointers.le_prev)
|
|
|
+ *newWork->pointers.le_prev = newWork;
|
|
|
+ } else {
|
|
|
+ /* create a new entry */
|
|
|
+ newWork = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_WorkItem));
|
|
|
+ if(!newWork)
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
- tw->workIds = biggerWorkIds;
|
|
|
- tw->work[tw->workSize-1] = *item;
|
|
|
- tw->workIds[tw->workSize-1] = UA_Guid_random(&server->random_seed);
|
|
|
- if(resultWorkGuid)
|
|
|
- *resultWorkGuid = tw->workIds[tw->workSize-1];
|
|
|
- return UA_STATUSCODE_GOOD;
|
|
|
- }
|
|
|
-
|
|
|
- // create a new entry
|
|
|
- if(!(tw = UA_malloc(sizeof(UA_TimedWork))))
|
|
|
- return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
- if(!(tw->work = UA_malloc(sizeof(UA_WorkItem)))) {
|
|
|
- UA_free(tw);
|
|
|
- return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
+ newWork->workSize = 0;
|
|
|
+ newWork->nextTime = firstTime;
|
|
|
+ newWork->interval = repetitionInterval;
|
|
|
+ if(lastTw)
|
|
|
+ LIST_INSERT_AFTER(lastTw, newWork, pointers);
|
|
|
+ else
|
|
|
+ LIST_INSERT_HEAD(&server->timedWork, newWork, pointers);
|
|
|
}
|
|
|
- if(!(tw->workIds = UA_malloc(sizeof(UA_Guid)))) {
|
|
|
- UA_free(tw->work);
|
|
|
- UA_free(tw);
|
|
|
- return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
+ newWork->work[newWork->workSize].work = *item;
|
|
|
+ if(resultWorkGuid) {
|
|
|
+ newWork->work[newWork->workSize].workId = UA_Guid_random(&server->random_seed);
|
|
|
+ *resultWorkGuid = newWork->work[matchingTw->workSize - 1].workId;
|
|
|
}
|
|
|
-
|
|
|
- tw->workSize = 1;
|
|
|
- tw->time = firstTime;
|
|
|
- tw->repetitionInterval = repetitionInterval;
|
|
|
- tw->work[0] = *item;
|
|
|
- tw->workIds[0] = UA_Guid_random(&server->random_seed);
|
|
|
- if(lastTw)
|
|
|
- LIST_INSERT_AFTER(lastTw, tw, pointers);
|
|
|
- else
|
|
|
- LIST_INSERT_HEAD(&server->timedWork, tw, pointers);
|
|
|
-
|
|
|
- if(resultWorkGuid)
|
|
|
- *resultWorkGuid = tw->workIds[0];
|
|
|
-
|
|
|
+ newWork->workSize++;
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
@@ -216,18 +207,18 @@ UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *w
|
|
|
|
|
|
UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
|
|
|
UA_Guid *resultWorkGuid) {
|
|
|
- return addTimedWork(server, work, UA_DateTime_now() + interval, interval, resultWorkGuid);
|
|
|
+ return addTimedWork(server, work, UA_DateTime_now() + interval * 1000, interval * 1000, resultWorkGuid);
|
|
|
}
|
|
|
|
|
|
/** Dispatches timed work, returns the timeout until the next timed work in ms */
|
|
|
static UA_UInt16 processTimedWork(UA_Server *server) {
|
|
|
UA_DateTime current = UA_DateTime_now();
|
|
|
- UA_TimedWork *next = LIST_FIRST(&server->timedWork);
|
|
|
- UA_TimedWork *tw = UA_NULL;
|
|
|
+ struct TimedWork *next = LIST_FIRST(&server->timedWork);
|
|
|
+ struct TimedWork *tw = UA_NULL;
|
|
|
|
|
|
while(next) {
|
|
|
tw = next;
|
|
|
- if(tw->time > current)
|
|
|
+ if(tw->nextTime > current)
|
|
|
break;
|
|
|
next = LIST_NEXT(tw, pointers);
|
|
|
|
|
@@ -239,9 +230,9 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
|
|
|
dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
|
|
|
tw->time += tw->repetitionInterval;
|
|
|
|
|
|
- UA_TimedWork *prevTw = tw; // after which tw do we insert?
|
|
|
+ struct TimedWork *prevTw = tw; // after which tw do we insert?
|
|
|
while(UA_TRUE) {
|
|
|
- UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
|
|
|
+ struct TimedWork *n = LIST_NEXT(prevTw, pointers);
|
|
|
if(!n || n->time > tw->time)
|
|
|
break;
|
|
|
prevTw = n;
|
|
@@ -261,12 +252,12 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
|
|
|
processWork(server, tw->work, tw->workSize); // does not free the work
|
|
|
|
|
|
// 2) If the work is repeated, add it back into the list. Otherwise remove it.
|
|
|
- if(tw->repetitionInterval > 0) {
|
|
|
- tw->time += tw->repetitionInterval;
|
|
|
- UA_TimedWork *prevTw = tw;
|
|
|
+ if(tw->interval > 0) {
|
|
|
+ tw->nextTime += tw->interval * 10;
|
|
|
+ struct TimedWork *prevTw = tw;
|
|
|
while(UA_TRUE) {
|
|
|
- UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
|
|
|
- if(!n || n->time > tw->time)
|
|
|
+ struct TimedWork *n = LIST_NEXT(prevTw, pointers);
|
|
|
+ if(!n || n->nextTime > tw->nextTime)
|
|
|
break;
|
|
|
prevTw = n;
|
|
|
}
|
|
@@ -276,35 +267,29 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
|
|
|
}
|
|
|
} else {
|
|
|
LIST_REMOVE(tw, pointers);
|
|
|
- UA_free(tw->work);
|
|
|
- UA_free(tw->workIds);
|
|
|
UA_free(tw);
|
|
|
}
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
// check if the next timed work is sooner than the usual timeout
|
|
|
- UA_TimedWork *first = LIST_FIRST(&server->timedWork);
|
|
|
- UA_Int32 timeout = MAXTIMEOUT;
|
|
|
+ struct TimedWork *first = LIST_FIRST(&server->timedWork);
|
|
|
+ UA_UInt16 timeout = MAXTIMEOUT;
|
|
|
if(first) {
|
|
|
- timeout = (first->time - current)/10;
|
|
|
+ timeout = (first->nextTime - current)/10000;
|
|
|
if(timeout > MAXTIMEOUT)
|
|
|
return MAXTIMEOUT;
|
|
|
- if(timeout < 0)
|
|
|
- return 0;
|
|
|
}
|
|
|
return timeout;
|
|
|
}
|
|
|
|
|
|
void UA_Server_deleteTimedWork(UA_Server *server) {
|
|
|
- UA_TimedWork *current;
|
|
|
- UA_TimedWork *next = LIST_FIRST(&server->timedWork);
|
|
|
+ struct TimedWork *current;
|
|
|
+ struct TimedWork *next = LIST_FIRST(&server->timedWork);
|
|
|
while(next) {
|
|
|
current = next;
|
|
|
next = LIST_NEXT(current, pointers);
|
|
|
LIST_REMOVE(current, pointers);
|
|
|
- UA_free(current->work);
|
|
|
- UA_free(current->workIds);
|
|
|
UA_free(current);
|
|
|
}
|
|
|
}
|
|
@@ -317,15 +302,15 @@ void UA_Server_deleteTimedWork(UA_Server *server) {
|
|
|
|
|
|
#define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
|
|
|
|
|
|
-struct UA_DelayedWork {
|
|
|
- UA_DelayedWork *next;
|
|
|
+struct DelayedWork {
|
|
|
+ struct DelayedWork *next;
|
|
|
UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
|
|
|
UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
|
|
|
UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
|
|
|
};
|
|
|
|
|
|
// Dispatched as a methodcall-WorkItem when the delayedwork is added
|
|
|
-static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
|
|
|
+static void getCounters(UA_Server *server, DelayedWork *delayed) {
|
|
|
UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
|
|
|
for(UA_UInt16 i = 0;i<server->nThreads;i++)
|
|
|
counters[i] = *server->workerCounters[i];
|
|
@@ -336,9 +321,9 @@ static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
|
|
|
// server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
|
|
|
// head).
|
|
|
static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
|
|
|
- UA_DelayedWork *dw = server->delayedWork;
|
|
|
+ struct DelayedWork *dw = server->delayedWork;
|
|
|
if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
|
|
|
- UA_DelayedWork *newwork = UA_malloc(sizeof(UA_DelayedWork));
|
|
|
+ struct DelayedWork *newwork = UA_malloc(sizeof(DelayedWork));
|
|
|
newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
|
|
|
newwork->workItemsCount = 0;
|
|
|
newwork->workerCounters = UA_NULL;
|
|
@@ -361,10 +346,10 @@ static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
|
|
|
}
|
|
|
|
|
|
static void processDelayedWork(UA_Server *server) {
|
|
|
- UA_DelayedWork *dw = server->delayedWork;
|
|
|
+ struct DelayedWork *dw = server->delayedWork;
|
|
|
while(dw) {
|
|
|
processWork(server, dw->workItems, dw->workItemsCount);
|
|
|
- UA_DelayedWork *next = dw->next;
|
|
|
+ struct DelayedWork *next = dw->next;
|
|
|
UA_free(dw->workerCounters);
|
|
|
UA_free(dw->workItems);
|
|
|
UA_free(dw);
|
|
@@ -374,9 +359,9 @@ static void processDelayedWork(UA_Server *server) {
|
|
|
|
|
|
// Execute this every N seconds (repeated work) to execute delayed work that is ready
|
|
|
static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
|
|
|
- UA_DelayedWork *dw = UA_NULL;
|
|
|
- UA_DelayedWork *readydw = UA_NULL;
|
|
|
- UA_DelayedWork *beforedw = server->delayedWork;
|
|
|
+ struct DelayedWork *dw = UA_NULL;
|
|
|
+ struct DelayedWork *readydw = UA_NULL;
|
|
|
+ struct DelayedWork *beforedw = server->delayedWork;
|
|
|
|
|
|
// start at the second...
|
|
|
if(beforedw)
|
|
@@ -444,8 +429,8 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
|
|
|
#endif
|
|
|
|
|
|
// 2) Start the networklayers
|
|
|
- for(UA_Int32 i=0;i<server->nlsSize;i++)
|
|
|
- server->nls[i].start(server->nls[i].nlHandle, &server->logger);
|
|
|
+ for(size_t i = 0; i <server->networkLayersSize; i++)
|
|
|
+ server->networkLayers[i].start(server->networkLayers[i].nlHandle, &server->logger);
|
|
|
|
|
|
//3) The loop
|
|
|
while(1) {
|
|
@@ -453,17 +438,17 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
|
|
|
UA_UInt16 timeout = processTimedWork(server);
|
|
|
|
|
|
// 3.2) Get work from the networklayer and dispatch it
|
|
|
- for(UA_Int32 i=0;i<server->nlsSize;i++) {
|
|
|
- UA_ServerNetworkLayer *nl = &server->nls[i];
|
|
|
+ for(size_t i = 0; i < server->networkLayersSize; i++) {
|
|
|
+ UA_ServerNetworkLayer *nl = &server->networkLayers[i];
|
|
|
UA_WorkItem *work;
|
|
|
UA_Int32 workSize;
|
|
|
if(*running) {
|
|
|
- if(i == server->nlsSize-1)
|
|
|
+ if(i == server->networkLayersSize-1)
|
|
|
workSize = nl->getWork(nl->nlHandle, &work, timeout);
|
|
|
else
|
|
|
workSize = nl->getWork(nl->nlHandle, &work, 0);
|
|
|
} else {
|
|
|
- workSize = server->nls[i].stop(nl->nlHandle, &work);
|
|
|
+ workSize = server->networkLayers[i].stop(nl->nlHandle, &work);
|
|
|
}
|
|
|
|
|
|
#ifdef UA_MULTITHREADING
|