123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- #include "ua_workqueue.h"
- void UA_WorkQueue_init(UA_WorkQueue *wq) {
-
- SIMPLEQ_INIT(&wq->delayedCallbacks);
- #ifdef UA_ENABLE_MULTITHREADING
- wq->delayedCallbacks_checkpoint = NULL;
- pthread_mutex_init(&wq->delayedCallbacks_accessMutex, NULL);
-
- SIMPLEQ_INIT(&wq->dispatchQueue);
- pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL);
- pthread_cond_init(&wq->dispatchQueue_condition, NULL);
- pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL);
- #endif
- }
- #ifdef UA_ENABLE_MULTITHREADING
- static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
- #endif
- void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
- #ifdef UA_ENABLE_MULTITHREADING
-
- UA_WorkQueue_stop(wq);
-
- while(true) {
- pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
- UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
- if(!dc) {
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
- break;
- }
- SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
- dc->callback(dc->application, dc->data);
- UA_free(dc);
- }
- #endif
-
- UA_WorkQueue_manuallyProcessDelayed(wq);
- #ifdef UA_ENABLE_MULTITHREADING
- wq->delayedCallbacks_checkpoint = NULL;
- pthread_mutex_destroy(&wq->dispatchQueue_accessMutex);
- pthread_cond_destroy(&wq->dispatchQueue_condition);
- pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex);
- pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex);
- #endif
- }
- #ifdef UA_ENABLE_MULTITHREADING
- static void *
- workerLoop(UA_Worker *worker) {
- UA_WorkQueue *wq = worker->queue;
- UA_UInt32 *counter = &worker->counter;
- volatile UA_Boolean *running = &worker->running;
-
- UA_random_seed((uintptr_t)worker);
- while(*running) {
- UA_atomic_addUInt32(counter, 1);
-
- pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
- UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
- if(dc)
- SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
-
- if(!dc) {
- pthread_mutex_lock(&wq->dispatchQueue_conditionMutex);
- pthread_cond_wait(&wq->dispatchQueue_condition,
- &wq->dispatchQueue_conditionMutex);
- pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex);
- continue;
- }
-
- if(dc->callback)
- dc->callback(dc->application, dc->data);
- UA_free(dc);
- }
- return NULL;
- }
- UA_StatusCode
- UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount) {
- if(wq->workersSize > 0 || workersCount == 0)
- return UA_STATUSCODE_BADINTERNALERROR;
-
-
- wq->workers = (UA_Worker*)UA_calloc(workersCount, sizeof(UA_Worker));
- if(!wq->workers)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- wq->workersSize = workersCount;
-
- for(size_t i = 0; i < workersCount; ++i) {
- UA_Worker *w = &wq->workers[i];
- w->queue = wq;
- w->counter = 0;
- w->running = true;
- pthread_create(&w->thread, NULL, (void* (*)(void*))workerLoop, w);
- }
- return UA_STATUSCODE_GOOD;
- }
- void UA_WorkQueue_stop(UA_WorkQueue *wq) {
- if(wq->workersSize == 0)
- return;
-
- for(size_t i = 0; i < wq->workersSize; ++i)
- wq->workers[i].running = false;
-
- pthread_cond_broadcast(&wq->dispatchQueue_condition);
-
- for(size_t i = 0; i < wq->workersSize; ++i)
- pthread_join(wq->workers[i].thread, NULL);
- UA_free(wq->workers);
- wq->workers = NULL;
- wq->workersSize = 0;
- }
- void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
- void *application, void *data) {
- UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
- if(!dc) {
- cb(application, data);
- return;
- }
- dc->callback = cb;
- dc->application = application;
- dc->data = data;
-
- pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next);
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
-
- pthread_cond_broadcast(&wq->dispatchQueue_condition);
- }
- #endif
- #ifdef UA_ENABLE_MULTITHREADING
- #define UA_MAX_DELAYED_SAMPLE 100
- static void
- dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
-
- for(size_t i = 0; i < wq->workersSize; ++i) {
- if(wq->workers[i].counter == wq->workers[i].checkpointCounter)
- return;
- }
-
- if(wq->delayedCallbacks_checkpoint != NULL) {
- UA_DelayedCallback *iter, *tmp_iter;
- SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) {
- pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next);
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
- if(iter == wq->delayedCallbacks_checkpoint)
- break;
- }
- }
-
- for(size_t i = 0; i < wq->workersSize; ++i)
- wq->workers[i].checkpointCounter = wq->workers[i].counter;
- wq->delayedCallbacks_checkpoint = cb;
- }
- #endif
- void
- UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
- #ifdef UA_ENABLE_MULTITHREADING
- pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
- #endif
- SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next);
- #ifdef UA_ENABLE_MULTITHREADING
- wq->delayedCallbacks_sinceDispatch++;
- if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) {
- dispatchDelayedCallbacks(wq, cb);
- wq->delayedCallbacks_sinceDispatch = 0;
- }
- pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
- #endif
- }
- void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) {
- UA_DelayedCallback *dc, *dc_tmp;
- SIMPLEQ_FOREACH_SAFE(dc, &wq->delayedCallbacks, next, dc_tmp) {
- SIMPLEQ_REMOVE_HEAD(&wq->delayedCallbacks, next);
- if(dc->callback)
- dc->callback(dc->application, dc->data);
- UA_free(dc);
- }
- #ifdef UA_ENABLE_MULTITHREADING
- wq->delayedCallbacks_checkpoint = NULL;
- #endif
- }
|