ua_server_methodqueue.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
  6. */
  7. #include "ua_server_methodqueue.h"
  8. #include <open62541/types_generated_handling.h>
  9. #include "ua_server_internal.h"
  10. #include <open62541/plugin/log.h>
  11. #if UA_MULTITHREADING >= 100
  12. /* Initialize Request Queue */
  13. void
  14. UA_Server_MethodQueues_init(UA_Server *server) {
  15. server->nMQCurSize = 0;
  16. UA_LOCK_INIT(server->ua_request_queue_lock);
  17. SIMPLEQ_INIT(&server->ua_method_request_queue);
  18. UA_LOCK_INIT(server->ua_response_queue_lock);
  19. SIMPLEQ_INIT(&server->ua_method_response_queue);
  20. UA_LOCK_INIT(server->ua_pending_list_lock);
  21. SIMPLEQ_INIT(&server->ua_method_pending_list);
  22. /* Add a regular callback for cleanup and maintenance using a 10s interval. */
  23. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Server_CheckQueueIntegrity,
  24. NULL, 10000.0, &server->nCBIdIntegrity);
  25. }
  26. /* Cleanup and terminate queues */
  27. void
  28. UA_Server_MethodQueues_delete(UA_Server *server) {
  29. UA_Server_removeCallback(server, server->nCBIdIntegrity);
  30. /* Clean up request queue */
  31. UA_LOCK(server->ua_request_queue_lock);
  32. while (!SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
  33. struct AsyncMethodQueueElement* request = SIMPLEQ_FIRST(&server->ua_method_request_queue);
  34. SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
  35. UA_Server_DeleteMethodQueueElement(server, request);
  36. }
  37. UA_UNLOCK(server->ua_request_queue_lock);
  38. /* Clean up response queue */
  39. UA_LOCK(server->ua_response_queue_lock);
  40. while (!SIMPLEQ_EMPTY(&server->ua_method_response_queue)) {
  41. struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&server->ua_method_response_queue);
  42. SIMPLEQ_REMOVE_HEAD(&server->ua_method_response_queue, next);
  43. UA_Server_DeleteMethodQueueElement(server, response);
  44. }
  45. UA_UNLOCK(server->ua_response_queue_lock);
  46. /* Clear all pending */
  47. UA_LOCK(server->ua_pending_list_lock);
  48. while (!SIMPLEQ_EMPTY(&server->ua_method_pending_list)) {
  49. struct AsyncMethodQueueElement* response = SIMPLEQ_FIRST(&server->ua_method_pending_list);
  50. SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
  51. UA_Server_DeleteMethodQueueElement(server, response);
  52. }
  53. UA_UNLOCK(server->ua_pending_list_lock);
  54. /* delete all locks */
  55. /* TODO KS: actually we should make sure the worker is not 'hanging' on this lock anymore */
  56. Sleep(100);
  57. UA_LOCK_DESTROY(server->ua_response_queue_lock);
  58. UA_LOCK_DESTROY(server->ua_request_queue_lock);
  59. UA_LOCK_DESTROY(server->ua_pending_list_lock);
  60. }
  61. /* Checks queue element timeouts */
  62. void
  63. UA_Server_CheckQueueIntegrity(UA_Server *server, void *data) {
  64. /* for debugging/testing purposes */
  65. if(server->config.asyncOperationTimeout <= 0.0) {
  66. UA_AsyncMethodManager_checkTimeouts(server, &server->asyncMethodManager);
  67. return;
  68. }
  69. /* To prevent a lockup, we remove a maximum 10% of timed out entries */
  70. /* on small queues, we do at least 3 */
  71. size_t bMaxRemove = server->config.maxAsyncOperationQueueSize / 10;
  72. if(bMaxRemove < 3)
  73. bMaxRemove = 3;
  74. UA_Boolean bCheckQueue = UA_TRUE;
  75. UA_LOCK(server->ua_request_queue_lock);
  76. /* Check if entry has been in the queue too long time */
  77. while(bCheckQueue && bMaxRemove-- && !SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
  78. struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&server->ua_method_request_queue);
  79. if (request_elem) {
  80. UA_DateTime tNow = UA_DateTime_now();
  81. UA_DateTime tReq = request_elem->m_tDispatchTime;
  82. UA_DateTime diff = tNow - tReq;
  83. if(diff > (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC)) {
  84. /* remove it from the queue */
  85. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  86. "UA_Server_CheckQueueIntegrity: Request #%u was removed due to a timeout (%f)",
  87. request_elem->m_nRequestId, server->config.asyncOperationTimeout);
  88. SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
  89. server->nMQCurSize--;
  90. /* Notify that we removed this request - e.g. Bad Call Response (UA_STATUSCODE_BADREQUESTTIMEOUT) */
  91. UA_CallMethodResult* result = &request_elem->m_Response;
  92. UA_CallMethodResult_clear(result);
  93. result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
  94. UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId, &request_elem->m_nSessionId, request_elem->m_nIndex, result);
  95. UA_CallMethodResult_clear(result);
  96. UA_Server_DeleteMethodQueueElement(server, request_elem);
  97. }
  98. else {
  99. /* queue entry is not older than server->nMQTimeoutSecs, so we stop checking */
  100. bCheckQueue = UA_FALSE;
  101. }
  102. }
  103. }
  104. UA_UNLOCK(server->ua_request_queue_lock);
  105. /* Clear all pending */
  106. bCheckQueue = UA_TRUE;
  107. UA_LOCK(server->ua_pending_list_lock);
  108. /* Check if entry has been in the pendig list too long time */
  109. while(bCheckQueue && !SIMPLEQ_EMPTY(&server->ua_method_pending_list)) {
  110. struct AsyncMethodQueueElement* request_elem = SIMPLEQ_FIRST(&server->ua_method_pending_list);
  111. if (request_elem) {
  112. UA_DateTime tNow = UA_DateTime_now();
  113. UA_DateTime tReq = request_elem->m_tDispatchTime;
  114. UA_DateTime diff = tNow - tReq;
  115. if (diff > (UA_DateTime)(server->config.asyncOperationTimeout * UA_DATETIME_MSEC)) {
  116. /* remove it from the list */
  117. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  118. "UA_Server_CheckQueueIntegrity: Pending request #%u was removed due to a timeout (%f)",
  119. request_elem->m_nRequestId, server->config.asyncOperationTimeout);
  120. SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
  121. /* Notify that we removed this request - e.g. Bad Call Response (UA_STATUSCODE_BADREQUESTTIMEOUT) */
  122. UA_CallMethodResult* result = &request_elem->m_Response;
  123. UA_CallMethodResult_clear(result);
  124. result->statusCode = UA_STATUSCODE_BADREQUESTTIMEOUT;
  125. UA_Server_InsertMethodResponse(server, request_elem->m_nRequestId, &request_elem->m_nSessionId, request_elem->m_nIndex, result);
  126. UA_CallMethodResult_clear(result);
  127. UA_Server_DeleteMethodQueueElement(server, request_elem);
  128. }
  129. else {
  130. /* list entry is not older than server->nMQTimeoutSecs, so we stop checking */
  131. bCheckQueue = UA_FALSE;
  132. }
  133. }
  134. }
  135. UA_UNLOCK(server->ua_pending_list_lock);
  136. /* Now we check if we still have pending CallRequests */
  137. UA_AsyncMethodManager_checkTimeouts(server, &server->asyncMethodManager);
  138. }
  139. /* Enqueue next MethodRequest */
  140. UA_StatusCode
  141. UA_Server_SetNextAsyncMethod(UA_Server *server,
  142. const UA_UInt32 nRequestId,
  143. const UA_NodeId *nSessionId,
  144. const UA_UInt32 nIndex,
  145. const UA_CallMethodRequest *pRequest) {
  146. UA_StatusCode result = UA_STATUSCODE_GOOD;
  147. if (server->config.maxAsyncOperationQueueSize == 0 ||
  148. server->nMQCurSize < server->config.maxAsyncOperationQueueSize) {
  149. struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)UA_calloc(1, sizeof(struct AsyncMethodQueueElement));
  150. if (elem)
  151. {
  152. UA_CallMethodRequest_init(&elem->m_Request);
  153. result = UA_CallMethodRequest_copy(pRequest, &elem->m_Request);
  154. if (result != UA_STATUSCODE_GOOD) {
  155. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
  156. "UA_Server_SetAsyncMethodResult: UA_CallMethodRequest_copy failed.");
  157. }
  158. elem->m_nRequestId = nRequestId;
  159. elem->m_nSessionId = *nSessionId;
  160. elem->m_nIndex = nIndex;
  161. UA_CallMethodResult_clear(&elem->m_Response);
  162. elem->m_tDispatchTime = UA_DateTime_now();
  163. UA_LOCK(server->ua_request_queue_lock);
  164. SIMPLEQ_INSERT_TAIL(&server->ua_method_request_queue, elem, next);
  165. server->nMQCurSize++;
  166. if(server->config.asyncOperationNotifyCallback)
  167. server->config.asyncOperationNotifyCallback(server);
  168. UA_UNLOCK(server->ua_request_queue_lock);
  169. }
  170. else {
  171. /* notify about error */
  172. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
  173. "UA_Server_SetNextAsyncMethod: Mem alloc failed.");
  174. result = UA_STATUSCODE_BADOUTOFMEMORY;
  175. }
  176. }
  177. else {
  178. /* issue warning */
  179. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  180. "UA_Server_SetNextAsyncMethod: Queue exceeds limit (%d).",
  181. (UA_UInt32)server->config.maxAsyncOperationQueueSize);
  182. result = UA_STATUSCODE_BADUNEXPECTEDERROR;
  183. }
  184. return result;
  185. }
  186. /* Private API */
  187. /* Get next Method Call Response */
  188. UA_Boolean
  189. UA_Server_GetAsyncMethodResult(UA_Server *server, struct AsyncMethodQueueElement **pResponse) {
  190. UA_Boolean bRV = UA_FALSE;
  191. UA_LOCK(server->ua_response_queue_lock);
  192. if (!SIMPLEQ_EMPTY(&server->ua_method_response_queue)) {
  193. *pResponse = SIMPLEQ_FIRST(&server->ua_method_response_queue);
  194. SIMPLEQ_REMOVE_HEAD(&server->ua_method_response_queue, next);
  195. bRV = UA_TRUE;
  196. }
  197. UA_UNLOCK(server->ua_response_queue_lock);
  198. return bRV;
  199. }
  200. /* Deep delete queue Element - only memory we did allocate */
  201. void
  202. UA_Server_DeleteMethodQueueElement(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
  203. UA_CallMethodRequest_clear(&pElem->m_Request);
  204. UA_CallMethodResult_clear(&pElem->m_Response);
  205. UA_free(pElem);
  206. }
  207. void UA_Server_AddPendingMethodCall(UA_Server* server, struct AsyncMethodQueueElement *pElem) {
  208. UA_LOCK(server->ua_pending_list_lock);
  209. pElem->m_tDispatchTime = UA_DateTime_now(); /* reset timestamp for timeout */
  210. SIMPLEQ_INSERT_TAIL(&server->ua_method_pending_list, pElem, next);
  211. UA_UNLOCK(server->ua_pending_list_lock);
  212. }
  213. void UA_Server_RmvPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
  214. /* Remove element from pending list */
  215. /* Do NOT delete it because we still need it */
  216. struct AsyncMethodQueueElement* current = NULL;
  217. struct AsyncMethodQueueElement* tmp_iter = NULL;
  218. struct AsyncMethodQueueElement* previous = NULL;
  219. UA_LOCK(server->ua_pending_list_lock);
  220. SIMPLEQ_FOREACH_SAFE(current, &server->ua_method_pending_list, next, tmp_iter) {
  221. if (pElem == current) {
  222. if (previous == NULL)
  223. SIMPLEQ_REMOVE_HEAD(&server->ua_method_pending_list, next);
  224. else
  225. SIMPLEQ_REMOVE_AFTER(&server->ua_method_pending_list, previous, next);
  226. break;
  227. }
  228. previous = current;
  229. }
  230. UA_UNLOCK(server->ua_pending_list_lock);
  231. return;
  232. }
  233. UA_Boolean UA_Server_IsPendingMethodCall(UA_Server *server, struct AsyncMethodQueueElement *pElem) {
  234. UA_Boolean bRV = UA_FALSE;
  235. struct AsyncMethodQueueElement* current = NULL;
  236. struct AsyncMethodQueueElement* tmp_iter = NULL;
  237. UA_LOCK(server->ua_pending_list_lock);
  238. SIMPLEQ_FOREACH_SAFE(current, &server->ua_method_pending_list, next, tmp_iter) {
  239. if (pElem == current) {
  240. bRV = UA_TRUE;
  241. break;
  242. }
  243. }
  244. UA_UNLOCK(server->ua_pending_list_lock);
  245. return bRV;
  246. }
  247. /* ----------------------------------------------------------------- */
  248. /* Public API */
  249. /* Get and remove next Method Call Request */
  250. UA_Boolean
  251. UA_Server_getAsyncOperation(UA_Server *server, UA_AsyncOperationType *type,
  252. const UA_AsyncOperationRequest **request,
  253. void **context) {
  254. UA_Boolean bRV = UA_FALSE;
  255. *type = UA_ASYNCOPERATIONTYPE_INVALID;
  256. struct AsyncMethodQueueElement *elem = NULL;
  257. UA_LOCK(server->ua_request_queue_lock);
  258. if (!SIMPLEQ_EMPTY(&server->ua_method_request_queue)) {
  259. elem = SIMPLEQ_FIRST(&server->ua_method_request_queue);
  260. SIMPLEQ_REMOVE_HEAD(&server->ua_method_request_queue, next);
  261. server->nMQCurSize--;
  262. if (elem) {
  263. *request = (UA_AsyncOperationRequest*)&elem->m_Request;
  264. *context = (void*)elem;
  265. bRV = UA_TRUE;
  266. }
  267. else {
  268. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
  269. "UA_Server_GetNextAsyncMethod: elem is a NULL-Pointer.");
  270. }
  271. }
  272. UA_UNLOCK(server->ua_request_queue_lock);
  273. if (bRV && elem) {
  274. *type = UA_ASYNCOPERATIONTYPE_CALL;
  275. UA_Server_AddPendingMethodCall(server, elem);
  276. }
  277. return bRV;
  278. }
  279. /* Worker submits Method Call Response */
  280. void
  281. UA_Server_setAsyncOperationResult(UA_Server *server,
  282. const UA_AsyncOperationResponse *response,
  283. void *context) {
  284. struct AsyncMethodQueueElement* elem = (struct AsyncMethodQueueElement*)context;
  285. if (!elem || !UA_Server_IsPendingMethodCall(server, elem) ) {
  286. /* Something went wrong, late call? */
  287. /* Dismiss response */
  288. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  289. "UA_Server_SetAsyncMethodResult: elem is a NULL-Pointer or not valid anymore.");
  290. }
  291. else {
  292. /* UA_Server_RmvPendingMethodCall MUST be called outside the lock
  293. * otherwise we can run into a deadlock */
  294. UA_Server_RmvPendingMethodCall(server, elem);
  295. UA_StatusCode result = UA_CallMethodResult_copy(&response->callMethodResult,
  296. &elem->m_Response);
  297. if (result != UA_STATUSCODE_GOOD) {
  298. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
  299. "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
  300. /* Add failed CallMethodResult to response queue */
  301. UA_CallMethodResult_clear(&elem->m_Response);
  302. elem->m_Response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
  303. }
  304. /* Insert response in queue */
  305. UA_LOCK(server->ua_response_queue_lock);
  306. SIMPLEQ_INSERT_TAIL(&server->ua_method_response_queue, elem, next);
  307. UA_UNLOCK(server->ua_response_queue_lock);
  308. }
  309. }
  310. #endif /* !UA_MULTITHREADING >= 100 */