ua_subscription_monitoreditem.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2018 (c) Fabian Arndt, Root-Core
  10. */
  11. #include "ua_server_internal.h"
  12. #include "ua_subscription.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. void
  15. UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  16. memset(mon, 0, sizeof(UA_MonitoredItem));
  17. mon->subscription = sub;
  18. TAILQ_INIT(&mon->queue);
  19. }
  20. void
  21. UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  22. UA_Subscription *sub = monitoredItem->subscription;
  23. UA_LOG_INFO_SESSION(server->config.logger, sub->session,
  24. "Subscription %u | MonitoredItem %i | "
  25. "Delete the MonitoredItem", sub->subscriptionId,
  26. monitoredItem->monitoredItemId);
  27. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  28. /* Remove the sampling callback */
  29. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  30. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  31. /* TODO: Access val data.event */
  32. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  33. "MonitoredItemTypes other than ChangeNotify or EventNotify "
  34. "are not supported yet");
  35. }
  36. /* Remove the queued notifications if attached to a subscription */
  37. if(monitoredItem->subscription) {
  38. UA_Notification *notification, *notification_tmp;
  39. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  40. listEntry, notification_tmp) {
  41. /* Remove the item from the queues and free the memory */
  42. UA_Notification_delete(sub, monitoredItem, notification);
  43. }
  44. }
  45. /* if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  46. * -> UA_DataChangeFilter does not hold dynamic content we need to free */
  47. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  48. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  49. /* Remove the monitored item from the node queue */
  50. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  51. UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
  52. /* Delete the event filter */
  53. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  54. }
  55. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  56. /* Deregister MonitoredItem in userland */
  57. if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
  58. /* Get the session context. Local MonitoredItems don't have a subscription. */
  59. UA_Session *session = NULL;
  60. if(monitoredItem->subscription)
  61. session = monitoredItem->subscription->session;
  62. if(!session)
  63. session = &server->adminSession;
  64. /* Get the node context */
  65. void *targetContext = NULL;
  66. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
  67. /* Deregister */
  68. server->config.monitoredItemRegisterCallback(server, &session->sessionId,
  69. session->sessionHandle, &monitoredItem->monitoredNodeId,
  70. targetContext, monitoredItem->attributeId, true);
  71. }
  72. /* Remove the monitored item */
  73. if(monitoredItem->listEntry.le_prev != NULL)
  74. LIST_REMOVE(monitoredItem, listEntry);
  75. UA_String_deleteMembers(&monitoredItem->indexRange);
  76. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  77. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  78. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  79. /* No actual callback, just remove the structure */
  80. monitoredItem->delayedFreePointers.callback = NULL;
  81. UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
  82. }
  83. UA_StatusCode
  84. UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  85. if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
  86. return UA_STATUSCODE_GOOD;
  87. /* Remove notifications until the queue size is reached */
  88. UA_Subscription *sub = mon->subscription;
  89. while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
  90. UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
  91. /* Make sure that the MonitoredItem does not lose its place in the
  92. * global queue when notifications are removed. Otherwise the
  93. * MonitoredItem can "starve" itself by putting new notifications always
  94. * at the end of the global queue and removing the old ones.
  95. *
  96. * - If the oldest notification is removed, put the second oldest
  97. * notification right behind it.
  98. * - If the newest notification is removed, put the new notification
  99. * right behind it. */
  100. UA_Notification *del; /* The notification that will be deleted */
  101. UA_Notification *after_del; /* The notification to keep and move after del */
  102. if(mon->discardOldest) {
  103. /* Remove the oldest */
  104. del = TAILQ_FIRST(&mon->queue);
  105. after_del = TAILQ_NEXT(del, listEntry);
  106. } else {
  107. /* Remove the second newest (to keep the up-to-date notification) */
  108. after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
  109. del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
  110. }
  111. /* Move after_del right after del in the global queue */
  112. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  113. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  114. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  115. /* Check if an OverflowEvent is being deleted */
  116. /* TODO: EventOverflows should never be deleted */
  117. UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
  118. if(del->data.event.fields.eventFieldsSize == 1 &&
  119. del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
  120. isNodeInTree(&server->config.nodestore,
  121. (UA_NodeId*)del->data.event.fields.eventFields[0].data,
  122. &overflowBaseId, &subtypeId, 1)) {
  123. --mon->eventOverflows;
  124. }
  125. #endif
  126. /* Delete the notification. This also removes the notification from the
  127. * linked lists. */
  128. UA_Notification_delete(sub, mon, del);
  129. }
  130. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  131. /* Create an overflow notification */
  132. /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
  133. * "is generated when the first Event has to be discarded [...] without discarding
  134. * any other event". So only generate one for all deleted events. */
  135. if (mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  136. /* check if an overflowEvent is being deleted
  137. * TODO: make sure overflowEvents are never deleted */
  138. UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
  139. /* an overflowEvent does not care about event filters and as such
  140. * will not be "triggered" correctly. Instead, a notification will
  141. * be inserted into the queue which includes only the nodeId of the
  142. * overflowEventType. It is up to the client to check for possible
  143. * overflows. */
  144. UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
  145. if (!overflowNotification)
  146. return UA_STATUSCODE_BADOUTOFMEMORY;
  147. UA_EventFieldList_init(&overflowNotification->data.event.fields);
  148. overflowNotification->data.event.fields.eventFields = UA_Variant_new();
  149. if (!overflowNotification->data.event.fields.eventFields) {
  150. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  151. UA_free(overflowNotification);
  152. return UA_STATUSCODE_BADOUTOFMEMORY;
  153. }
  154. overflowNotification->data.event.fields.eventFieldsSize = 1;
  155. UA_StatusCode retval =
  156. UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
  157. &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
  158. if (retval != UA_STATUSCODE_GOOD) {
  159. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  160. UA_free(overflowNotification);
  161. return retval;
  162. }
  163. overflowNotification->mon = mon;
  164. /* The amount of notifications in the subscription don't change. The specification
  165. * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
  166. * (In this case the queue in the MonitoredItemQueue IS affected internally because externally
  167. * the queueSize will always appear with eventOverflows subtracted from it)
  168. *
  169. * Since they are reduced in Notification_delete the queues are increased here, so they
  170. * will remain the same in the end.
  171. *
  172. * Do not use Notification_enqueue to insert the notification into the queues, since this would
  173. * cause a bad recursive call of this function.
  174. */
  175. if (mon->discardOldest) {
  176. TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
  177. TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue,
  178. overflowNotification, globalEntry);
  179. } else {
  180. TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
  181. TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
  182. overflowNotification, globalEntry);
  183. }
  184. ++mon->eventOverflows;
  185. ++mon->queueSize;
  186. ++sub->notificationQueueSize;
  187. ++sub->eventNotifications;
  188. }
  189. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  190. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  191. /* Get the element that carries the infobits */
  192. UA_Notification *notification = NULL;
  193. if(mon->discardOldest)
  194. notification = TAILQ_FIRST(&mon->queue);
  195. else
  196. notification = TAILQ_LAST(&mon->queue, NotificationQueue);
  197. UA_assert(notification);
  198. if(mon->maxQueueSize > 1) {
  199. /* Add the infobits either to the newest or the new last entry */
  200. notification->data.value.hasStatus = true;
  201. notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  202. UA_STATUSCODE_INFOBITS_OVERFLOW);
  203. } else {
  204. /* If the queue size is reduced to one, remove the infobits */
  205. notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  206. UA_STATUSCODE_INFOBITS_OVERFLOW);
  207. }
  208. }
  209. /* TODO: Infobits for Events? */
  210. return UA_STATUSCODE_GOOD;
  211. }
  212. UA_StatusCode
  213. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  214. if(mon->sampleCallbackIsRegistered)
  215. return UA_STATUSCODE_GOOD;
  216. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  217. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  218. return UA_STATUSCODE_GOOD;
  219. UA_StatusCode retval =
  220. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  221. mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
  222. if(retval == UA_STATUSCODE_GOOD)
  223. mon->sampleCallbackIsRegistered = true;
  224. return retval;
  225. }
  226. UA_StatusCode
  227. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  228. if(!mon->sampleCallbackIsRegistered)
  229. return UA_STATUSCODE_GOOD;
  230. mon->sampleCallbackIsRegistered = false;
  231. return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  232. }
  233. #endif /* UA_ENABLE_SUBSCRIPTIONS */