ua_subscription_monitoreditem.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2018 (c) Fabian Arndt, Root-Core
  10. */
  11. #include "ua_server_internal.h"
  12. #include "ua_subscription.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. /****************/
  15. /* Notification */
  16. /****************/
  17. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  18. static const UA_NodeId overflowEventType =
  19. {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}};
  20. static const UA_NodeId simpleOverflowEventType =
  21. {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE}};
  22. static UA_Boolean
  23. UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
  24. UA_MonitoredItem *mon = n->mon;
  25. if(mon->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER)
  26. return false;
  27. UA_EventFieldList *efl = &n->data.event.fields;
  28. if(efl->eventFieldsSize >= 1 &&
  29. efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
  30. isNodeInTree(server->nsCtx, (const UA_NodeId *)efl->eventFields[0].data,
  31. &overflowEventType, &subtypeId, 1)) {
  32. return true;
  33. }
  34. return false;
  35. }
  36. /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
  37. * "is generated when the first Event has to be discarded [...] without
  38. * discarding any other event". So only generate one for all deleted events. */
  39. static UA_StatusCode
  40. createEventOverflowNotification(UA_Server *server, UA_Subscription *sub,
  41. UA_MonitoredItem *mon, UA_Notification *indicator) {
  42. /* Avoid two redundant overflow events in a row */
  43. if(UA_Notification_isOverflowEvent(server, indicator))
  44. return UA_STATUSCODE_GOOD;
  45. /* A notification is inserted into the queue which includes only the
  46. * NodeId of the overflowEventType. It is up to the client to check for
  47. * possible overflows. */
  48. /* Allocate the notification */
  49. UA_Notification *overflowNotification = (UA_Notification *)
  50. UA_malloc(sizeof(UA_Notification));
  51. if(!overflowNotification)
  52. return UA_STATUSCODE_BADOUTOFMEMORY;;
  53. /* Set the notification fields */
  54. overflowNotification->mon = mon;
  55. UA_EventFieldList_init(&overflowNotification->data.event.fields);
  56. overflowNotification->data.event.fields.eventFields = UA_Variant_new();
  57. if(!overflowNotification->data.event.fields.eventFields) {
  58. UA_free(overflowNotification);
  59. return UA_STATUSCODE_BADOUTOFMEMORY;;
  60. }
  61. overflowNotification->data.event.fields.eventFieldsSize = 1;
  62. UA_StatusCode retval =
  63. UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
  64. &simpleOverflowEventType, &UA_TYPES[UA_TYPES_NODEID]);
  65. if(retval != UA_STATUSCODE_GOOD) {
  66. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  67. UA_free(overflowNotification);
  68. return retval;
  69. }
  70. /* Insert before the "indicator notification". This is either first in the
  71. * queue (if the oldest notification was removed) or before the new event
  72. * that remains the last element of the queue. */
  73. TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry);
  74. ++mon->eventOverflows;
  75. ++mon->queueSize;
  76. TAILQ_NEXT(overflowNotification, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
  77. if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
  78. TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
  79. ++sub->notificationQueueSize;
  80. ++sub->eventNotifications;
  81. }
  82. return UA_STATUSCODE_GOOD;
  83. }
  84. #endif
  85. /* !!! The enqueue and dequeue operations need to match the reporting
  86. * disable/enable logic in Operation_SetMonitoringMode !!! */
  87. void
  88. UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
  89. UA_MonitoredItem *mon, UA_Notification *n) {
  90. /* Add to the MonitoredItem */
  91. TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
  92. ++mon->queueSize;
  93. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  94. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
  95. UA_Notification_isOverflowEvent(server, n))
  96. ++mon->eventOverflows;
  97. #endif
  98. /* Add to the subscription if reporting is enabled */
  99. TAILQ_NEXT(n, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
  100. if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
  101. TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
  102. ++sub->notificationQueueSize;
  103. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  104. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
  105. ++sub->eventNotifications;
  106. } else
  107. #endif
  108. {
  109. ++sub->dataChangeNotifications;
  110. }
  111. }
  112. /* Ensure enough space is available in the MonitoredItem. Do this only after
  113. * adding the new Notification. */
  114. UA_MonitoredItem_ensureQueueSpace(server, mon);
  115. }
  116. void
  117. UA_Notification_dequeue(UA_Server *server, UA_Notification *n) {
  118. UA_MonitoredItem *mon = n->mon;
  119. UA_Subscription *sub = mon->subscription;
  120. /* Remove from the MonitoredItem queue */
  121. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  122. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
  123. UA_Notification_isOverflowEvent(server, n))
  124. --mon->eventOverflows;
  125. #endif
  126. TAILQ_REMOVE(&mon->queue, n, listEntry);
  127. --mon->queueSize;
  128. /* Remove from the subscription's queue */
  129. if(TAILQ_NEXT(n, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
  130. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  131. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
  132. --sub->eventNotifications;
  133. } else
  134. #endif
  135. {
  136. --sub->dataChangeNotifications;
  137. }
  138. TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
  139. --sub->notificationQueueSize;
  140. }
  141. }
  142. void
  143. UA_Notification_delete(UA_Notification *n) {
  144. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  145. UA_MonitoredItem *mon = n->mon;
  146. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
  147. UA_EventFieldList_deleteMembers(&n->data.event.fields);
  148. /* EventFilterResult currently isn't being used
  149. * UA_EventFilterResult_delete(notification->data.event->result); */
  150. } else
  151. #endif
  152. {
  153. UA_DataValue_deleteMembers(&n->data.value);
  154. }
  155. UA_free(n);
  156. }
  157. /*****************/
  158. /* MonitoredItem */
  159. /*****************/
  160. void
  161. UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  162. memset(mon, 0, sizeof(UA_MonitoredItem));
  163. mon->subscription = sub;
  164. TAILQ_INIT(&mon->queue);
  165. }
  166. void
  167. UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  168. UA_LOCK_ASSERT(server->serviceMutex, 1);
  169. /* Remove the sampling callback */
  170. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  171. /* Remove the queued notifications if attached to a subscription (not a
  172. * local MonitoredItem) */
  173. if(monitoredItem->subscription) {
  174. UA_Notification *notification, *notification_tmp;
  175. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  176. listEntry, notification_tmp) {
  177. /* Remove the item from the queues and free the memory */
  178. UA_Notification_dequeue(server, notification);
  179. UA_Notification_delete(notification);
  180. }
  181. }
  182. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  183. if(monitoredItem->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
  184. /* Remove the monitored item from the node queue */
  185. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  186. UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
  187. UA_EventFilter_clear(&monitoredItem->filter.eventFilter);
  188. } else
  189. #endif
  190. {
  191. /* UA_DataChangeFilter does not hold dynamic content we need to free */
  192. /* UA_DataChangeFilter_clear(&monitoredItem->filter.dataChangeFilter); */
  193. }
  194. /* Deregister MonitoredItem in userland */
  195. if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
  196. /* Get the session context. Local MonitoredItems don't have a subscription. */
  197. UA_Session *session = NULL;
  198. if(monitoredItem->subscription)
  199. session = monitoredItem->subscription->session;
  200. if(!session)
  201. session = &server->adminSession;
  202. /* Get the node context */
  203. void *targetContext = NULL;
  204. getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
  205. /* Deregister */
  206. UA_UNLOCK(server->serviceMutex);
  207. server->config.monitoredItemRegisterCallback(server, &session->sessionId,
  208. session->sessionHandle,
  209. &monitoredItem->monitoredNodeId,
  210. targetContext, monitoredItem->attributeId, true);
  211. UA_LOCK(server->serviceMutex);
  212. }
  213. /* Remove the monitored item */
  214. if(monitoredItem->listEntry.le_prev != NULL)
  215. LIST_REMOVE(monitoredItem, listEntry);
  216. UA_String_deleteMembers(&monitoredItem->indexRange);
  217. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  218. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  219. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  220. /* No actual callback, just remove the structure */
  221. monitoredItem->delayedFreePointers.callback = NULL;
  222. UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
  223. }
  224. UA_StatusCode
  225. UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  226. /* Assert: The eventoverflow are counted in the queue size; There can be
  227. * only one eventoverflow more than normal entries */
  228. UA_assert(mon->queueSize >= mon->eventOverflows);
  229. UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1);
  230. /* Nothing to do */
  231. if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
  232. return UA_STATUSCODE_GOOD;
  233. #ifdef __clang_analyzer__
  234. return UA_STATUSCODE_GOOD;
  235. #endif
  236. /* Remove notifications until the queue size is reached */
  237. UA_Subscription *sub = mon->subscription;
  238. while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
  239. /* At least two notifications that are not eventOverflows in the queue */
  240. UA_assert(mon->queueSize - mon->eventOverflows >= 2);
  241. /* Select the next notification to delete. Skip over overflow events. */
  242. UA_Notification *del = NULL;
  243. if(mon->discardOldest) {
  244. /* Remove the oldest */
  245. del = TAILQ_FIRST(&mon->queue);
  246. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  247. while(UA_Notification_isOverflowEvent(server, del))
  248. del = TAILQ_NEXT(del, listEntry); /* skip overflow events */
  249. #endif
  250. } else {
  251. /* Remove the second newest (to keep the up-to-date notification).
  252. * The last entry is not an OverflowEvent -- we just added it. */
  253. del = TAILQ_LAST(&mon->queue, NotificationQueue);
  254. del = TAILQ_PREV(del, NotificationQueue, listEntry);
  255. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  256. while(UA_Notification_isOverflowEvent(server, del))
  257. del = TAILQ_PREV(del, NotificationQueue, listEntry); /* skip overflow events */
  258. #endif
  259. }
  260. UA_assert(del); /* There must have been one entry that can be deleted */
  261. /* If reporting is activated (entries are also in the subscriptions
  262. * global queue): Move the entry after del in the per-MonitoredItem
  263. * queue right after del in the global queue. (It is already right after
  264. * del in the per-MonitoredItem queue.) This is required so we don't
  265. * starve MonitoredItems with a high sampling interval by always
  266. * removing their first appearance in the gloal queue for the
  267. * Subscription. */
  268. if(TAILQ_NEXT(del, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
  269. UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
  270. UA_assert(after_del); /* There must be one remaining element after del */
  271. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  272. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  273. }
  274. /* Delete the notification */
  275. UA_Notification_dequeue(server, del);
  276. UA_Notification_delete(del);
  277. }
  278. /* Get the element where the overflow shall be announced (infobits or
  279. * overflowevent) */
  280. UA_Notification *indicator;
  281. if(mon->discardOldest)
  282. indicator = TAILQ_FIRST(&mon->queue);
  283. else
  284. indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
  285. UA_assert(indicator);
  286. /* Create an overflow notification */
  287. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  288. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
  289. return createEventOverflowNotification(server, sub, mon, indicator);
  290. } else
  291. #endif
  292. {
  293. /* Set the infobits of a datachange notification */
  294. if(mon->maxQueueSize > 1) {
  295. /* Add the infobits either to the newest or the new last entry */
  296. indicator->data.value.hasStatus = true;
  297. indicator->data.value.status |=
  298. (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
  299. }
  300. }
  301. return UA_STATUSCODE_GOOD;
  302. }
  303. UA_StatusCode
  304. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  305. UA_LOCK_ASSERT(server->serviceMutex, 1);
  306. if(mon->sampleCallbackIsRegistered)
  307. return UA_STATUSCODE_GOOD;
  308. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  309. if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER)
  310. return UA_STATUSCODE_GOOD;
  311. UA_StatusCode retval =
  312. addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  313. mon, mon->samplingInterval, &mon->sampleCallbackId);
  314. if(retval == UA_STATUSCODE_GOOD)
  315. mon->sampleCallbackIsRegistered = true;
  316. return retval;
  317. }
  318. void
  319. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  320. UA_LOCK_ASSERT(server->serviceMutex, 1);
  321. if(!mon->sampleCallbackIsRegistered)
  322. return;
  323. removeCallback(server, mon->sampleCallbackId);
  324. mon->sampleCallbackIsRegistered = false;
  325. }
  326. #endif /* UA_ENABLE_SUBSCRIPTIONS */