ua_client_worker.c 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_util.h"
  5. #include "ua_client.h"
  6. #include "ua_client_internal.h"
  7. /**
  8. * Worker Threads and Dispatch Queue
  9. * ---------------------------------
  10. * The worker threads dequeue callbacks from a central Multi-Producer
  11. * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
  12. * The condition to wake them up is triggered whenever a callback is
  13. * dispatched.
  14. *
  15. * Future Plans: Use work-stealing to load-balance between cores.
  16. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  17. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  18. /**
  19. * Repeated Callbacks
  20. * ------------------
  21. * Repeated Callbacks are handled by UA_Timer (used in both client and client).
  22. * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
  23. * they are executed immediately. */
  24. void UA_Client_workerCallback(UA_Client *client, UA_ClientCallback callback,
  25. void *data) {
  26. /* Execute immediately */
  27. callback(client, data);
  28. }
  29. /**
  30. * Delayed Callbacks
  31. * -----------------
  32. *
  33. * Delayed Callbacks are called only when all callbacks that were dispatched
  34. * prior are finished. In the single-threaded case, the callback is added to a
  35. * singly-linked list that is processed at the end of the client's main-loop. In
  36. * the multi-threaded case, the delay is ensure by a three-step procedure:
  37. *
  38. * 1. The delayed callback is dispatched to the worker queue. So it is only
  39. * dequeued when all prior callbacks have been dequeued.
  40. *
  41. * 2. When the callback is first dequeued by a worker, sample the counter of all
  42. * workers. Once all counters have advanced, the callback is ready.
  43. *
  44. * 3. Check regularly if the callback is ready by adding it back to the dispatch
  45. * queue. */
  46. typedef struct UA_DelayedClientCallback {
  47. SLIST_ENTRY(UA_DelayedClientCallback)
  48. next;
  49. UA_ClientCallback callback;
  50. void *data;
  51. } UA_DelayedClientCallback;
  52. UA_StatusCode UA_Client_delayedCallback(UA_Client *client,
  53. UA_ClientCallback callback, void *data) {
  54. UA_DelayedClientCallback *dc = (UA_DelayedClientCallback*) UA_malloc(
  55. sizeof(UA_DelayedClientCallback));
  56. if (!dc)
  57. return UA_STATUSCODE_BADOUTOFMEMORY;
  58. dc->callback = callback;
  59. dc->data = data;
  60. SLIST_INSERT_HEAD(&client->delayedClientCallbacks, dc, next);
  61. return UA_STATUSCODE_GOOD;
  62. }
  63. void
  64. processDelayedClientCallbacks(UA_Client *client);
  65. void processDelayedClientCallbacks(UA_Client *client) {
  66. UA_DelayedClientCallback *dc, *dc_tmp;
  67. SLIST_FOREACH_SAFE(dc, &client->delayedClientCallbacks, next, dc_tmp)
  68. {
  69. SLIST_REMOVE(&client->delayedClientCallbacks, dc,
  70. UA_DelayedClientCallback, next);
  71. dc->callback(client, dc->data);
  72. UA_free(dc);
  73. }
  74. }
  75. static void
  76. asyncServiceTimeoutCheck(UA_Client *client) {
  77. UA_DateTime now = UA_DateTime_nowMonotonic();
  78. /* Timeout occurs, remove the callback */
  79. AsyncServiceCall *ac, *ac_tmp;
  80. LIST_FOREACH_SAFE(ac, &client->asyncServiceCalls, pointers, ac_tmp) {
  81. if(!ac->timeout)
  82. continue;
  83. if(ac->start + (UA_DateTime)(ac->timeout * UA_DATETIME_MSEC) <= now) {
  84. LIST_REMOVE(ac, pointers);
  85. UA_Client_AsyncService_cancel(client, ac, UA_STATUSCODE_BADTIMEOUT);
  86. UA_free(ac);
  87. }
  88. }
  89. }
  90. static void
  91. backgroundConnectivityCallback(UA_Client *client, void *userdata,
  92. UA_UInt32 requestId, const UA_ReadResponse *response) {
  93. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTIMEOUT) {
  94. if (client->config.inactivityCallback)
  95. client->config.inactivityCallback(client);
  96. }
  97. client->pendingConnectivityCheck = false;
  98. client->lastConnectivityCheck = UA_DateTime_nowMonotonic();
  99. }
  100. static UA_StatusCode
  101. UA_Client_backgroundConnectivity(UA_Client *client) {
  102. if(!client->config.connectivityCheckInterval)
  103. return UA_STATUSCODE_GOOD;
  104. if (client->pendingConnectivityCheck)
  105. return UA_STATUSCODE_GOOD;
  106. UA_DateTime now = UA_DateTime_nowMonotonic();
  107. UA_DateTime nextDate = client->lastConnectivityCheck + (UA_DateTime)(client->config.connectivityCheckInterval * UA_DATETIME_MSEC);
  108. if(now <= nextDate)
  109. return UA_STATUSCODE_GOOD;
  110. UA_ReadRequest request;
  111. UA_ReadRequest_init(&request);
  112. UA_ReadValueId rvid;
  113. UA_ReadValueId_init(&rvid);
  114. rvid.attributeId = UA_ATTRIBUTEID_VALUE;
  115. rvid.nodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STATE);
  116. request.nodesToRead = &rvid;
  117. request.nodesToReadSize = 1;
  118. UA_StatusCode retval = __UA_Client_AsyncService(client, &request, &UA_TYPES[UA_TYPES_READREQUEST],
  119. (UA_ClientAsyncServiceCallback)backgroundConnectivityCallback,
  120. &UA_TYPES[UA_TYPES_READRESPONSE], NULL, NULL);
  121. client->pendingConnectivityCheck = true;
  122. return retval;
  123. }
  124. /**
  125. * Main Client Loop
  126. * ----------------
  127. * Start: Spin up the workers and the network layer
  128. * Iterate: Process repeated callbacks and events in the network layer.
  129. * This part can be driven from an external main-loop in an
  130. * event-driven single-threaded architecture.
  131. * Stop: Stop workers, finish all callbacks, stop the network layer,
  132. * clean up */
  133. UA_StatusCode UA_Client_run_iterate(UA_Client *client, UA_UInt16 timeout) {
  134. // TODO connectivity check & timeout features for the async implementation (timeout == 0)
  135. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  136. #ifdef UA_ENABLE_SUBSCRIPTIONS
  137. UA_StatusCode retvalPublish = UA_Client_Subscriptions_backgroundPublish(client);
  138. if(client->state >= UA_CLIENTSTATE_SESSION && retvalPublish != UA_STATUSCODE_GOOD)
  139. return retvalPublish;
  140. #endif
  141. /* Make sure we have an open channel */
  142. /************************************************************/
  143. /* FIXME: This is a dirty workaround */
  144. if(client->state >= UA_CLIENTSTATE_SECURECHANNEL)
  145. retval = openSecureChannel(client, true);
  146. /* FIXME: Will most likely break somewhere in the future */
  147. /************************************************************/
  148. if(timeout) {
  149. if(retval != UA_STATUSCODE_GOOD)
  150. return retval;
  151. retval = UA_Client_backgroundConnectivity(client);
  152. if(retval != UA_STATUSCODE_GOOD)
  153. return retval;
  154. UA_DateTime maxDate = UA_DateTime_nowMonotonic() + (timeout * UA_DATETIME_MSEC);
  155. retval = receiveServiceResponse(client, NULL, NULL, maxDate, NULL);
  156. if(retval == UA_STATUSCODE_GOODNONCRITICALTIMEOUT)
  157. retval = UA_STATUSCODE_GOOD;
  158. }
  159. else{
  160. UA_DateTime now = UA_DateTime_nowMonotonic();
  161. UA_Timer_process(&client->timer, now,
  162. (UA_TimerDispatchCallback) UA_Client_workerCallback, client);
  163. UA_ClientState cs = UA_Client_getState(client);
  164. retval = UA_Client_connect_iterate(client);
  165. /* Connection failed, drop the rest */
  166. if(retval != UA_STATUSCODE_GOOD)
  167. return retval;
  168. if((cs == UA_CLIENTSTATE_SECURECHANNEL) || (cs == UA_CLIENTSTATE_SESSION)) {
  169. /* Check for new data */
  170. retval = receiveServiceResponseAsync(client, NULL, NULL);
  171. } else {
  172. retval = receivePacketAsync(client);
  173. }
  174. }
  175. #ifdef UA_ENABLE_SUBSCRIPTIONS
  176. /* The inactivity check must be done after receiveServiceResponse*/
  177. UA_Client_Subscriptions_backgroundPublishInactivityCheck(client);
  178. #endif
  179. asyncServiceTimeoutCheck(client);
  180. #ifndef UA_ENABLE_MULTITHREADING
  181. /* Process delayed callbacks when all callbacks and
  182. * network events are done */
  183. processDelayedClientCallbacks(client);
  184. #endif
  185. return retval;
  186. }