ua_subscription_monitoreditem.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2018 (c) Fabian Arndt, Root-Core
  10. */
  11. #include "ua_server_internal.h"
  12. #include "ua_subscription.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. void
  15. UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  16. memset(mon, 0, sizeof(UA_MonitoredItem));
  17. mon->subscription = sub;
  18. TAILQ_INIT(&mon->queue);
  19. }
  20. void
  21. UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  22. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  23. /* Remove the sampling callback */
  24. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  25. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  26. /* TODO: Access val data.event */
  27. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  28. "MonitoredItemTypes other than ChangeNotify or EventNotify "
  29. "are not supported yet");
  30. }
  31. /* Remove the queued notifications if attached to a subscription */
  32. if(monitoredItem->subscription) {
  33. UA_Subscription *sub = monitoredItem->subscription;
  34. UA_Notification *notification, *notification_tmp;
  35. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  36. listEntry, notification_tmp) {
  37. /* Remove the item from the queues and free the memory */
  38. UA_Notification_delete(sub, monitoredItem, notification);
  39. }
  40. }
  41. /* if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  42. * -> UA_DataChangeFilter does not hold dynamic content we need to free */
  43. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  44. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  45. /* Remove the monitored item from the node queue */
  46. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  47. UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
  48. /* Delete the event filter */
  49. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  50. }
  51. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  52. /* Deregister MonitoredItem in userland */
  53. if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
  54. /* Get the session context. Local MonitoredItems don't have a subscription. */
  55. UA_Session *session = NULL;
  56. if(monitoredItem->subscription)
  57. session = monitoredItem->subscription->session;
  58. if(!session)
  59. session = &server->adminSession;
  60. /* Get the node context */
  61. void *targetContext = NULL;
  62. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
  63. /* Deregister */
  64. server->config.monitoredItemRegisterCallback(server, &session->sessionId,
  65. session->sessionHandle, &monitoredItem->monitoredNodeId,
  66. targetContext, monitoredItem->attributeId, true);
  67. }
  68. /* Remove the monitored item */
  69. if(monitoredItem->listEntry.le_prev != NULL)
  70. LIST_REMOVE(monitoredItem, listEntry);
  71. UA_String_deleteMembers(&monitoredItem->indexRange);
  72. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  73. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  74. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  75. /* No actual callback, just remove the structure */
  76. monitoredItem->delayedFreePointers.callback = NULL;
  77. UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
  78. }
  79. UA_StatusCode
  80. UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  81. if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
  82. return UA_STATUSCODE_GOOD;
  83. /* Remove notifications until the queue size is reached */
  84. UA_Subscription *sub = mon->subscription;
  85. while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
  86. UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
  87. /* Make sure that the MonitoredItem does not lose its place in the
  88. * global queue when notifications are removed. Otherwise the
  89. * MonitoredItem can "starve" itself by putting new notifications always
  90. * at the end of the global queue and removing the old ones.
  91. *
  92. * - If the oldest notification is removed, put the second oldest
  93. * notification right behind it.
  94. * - If the newest notification is removed, put the new notification
  95. * right behind it. */
  96. UA_Notification *del; /* The notification that will be deleted */
  97. UA_Notification *after_del; /* The notification to keep and move after del */
  98. if(mon->discardOldest) {
  99. /* Remove the oldest */
  100. del = TAILQ_FIRST(&mon->queue);
  101. after_del = TAILQ_NEXT(del, listEntry);
  102. } else {
  103. /* Remove the second newest (to keep the up-to-date notification) */
  104. after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
  105. del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
  106. }
  107. /* Move after_del right after del in the global queue */
  108. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  109. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  110. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  111. /* Create an overflow notification */
  112. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  113. /* check if an overflowEvent is being deleted
  114. * TODO: make sure overflowEvents are never deleted */
  115. UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
  116. UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
  117. /* Check if an OverflowEvent is being deleted */
  118. if(del->data.event.fields.eventFieldsSize == 1 &&
  119. del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
  120. isNodeInTree(&server->config.nodestore,
  121. (UA_NodeId*)del->data.event.fields.eventFields[0].data,
  122. &overflowBaseId, &subtypeId, 1)) {
  123. /* Don't do anything, since adding and removing an overflow will not change anything */
  124. return UA_STATUSCODE_GOOD;
  125. }
  126. /* cause an overflowEvent */
  127. /* an overflowEvent does not care about event filters and as such
  128. * will not be "triggered" correctly. Instead, a notification will
  129. * be inserted into the queue which includes only the nodeId of the
  130. * overflowEventType. It is up to the client to check for possible
  131. * overflows. */
  132. UA_Notification *overflowNotification = (UA_Notification*)UA_malloc(sizeof(UA_Notification));
  133. if(!overflowNotification)
  134. return UA_STATUSCODE_BADOUTOFMEMORY;
  135. UA_EventFieldList_init(&overflowNotification->data.event.fields);
  136. overflowNotification->data.event.fields.eventFields = UA_Variant_new();
  137. if(!overflowNotification->data.event.fields.eventFields) {
  138. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  139. UA_free(overflowNotification);
  140. return UA_STATUSCODE_BADOUTOFMEMORY;
  141. }
  142. overflowNotification->data.event.fields.eventFieldsSize = 1;
  143. UA_StatusCode retval =
  144. UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
  145. &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
  146. if (retval != UA_STATUSCODE_GOOD) {
  147. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  148. UA_free(overflowNotification);
  149. return retval;
  150. }
  151. overflowNotification->mon = mon;
  152. if(mon->discardOldest) {
  153. TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
  154. TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue,
  155. overflowNotification, globalEntry);
  156. } else {
  157. TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
  158. TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
  159. overflowNotification, globalEntry);
  160. }
  161. /* The amount of notifications in the subscription don't change. The specification
  162. * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
  163. * Since they are reduced in Notification_delete the queues are increased here, so they
  164. * will remain the same in the end.
  165. */
  166. ++sub->notificationQueueSize;
  167. ++sub->eventNotifications;
  168. }
  169. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  170. /* Delete the notification. This also removes the notification from the
  171. * linked lists. */
  172. UA_Notification_delete(sub, mon, del);
  173. }
  174. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  175. /* Get the element that carries the infobits */
  176. UA_Notification *notification = NULL;
  177. if(mon->discardOldest)
  178. notification = TAILQ_FIRST(&mon->queue);
  179. else
  180. notification = TAILQ_LAST(&mon->queue, NotificationQueue);
  181. UA_assert(notification);
  182. if(mon->maxQueueSize > 1) {
  183. /* Add the infobits either to the newest or the new last entry */
  184. notification->data.value.hasStatus = true;
  185. notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  186. UA_STATUSCODE_INFOBITS_OVERFLOW);
  187. } else {
  188. /* If the queue size is reduced to one, remove the infobits */
  189. notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  190. UA_STATUSCODE_INFOBITS_OVERFLOW);
  191. }
  192. }
  193. /* TODO: Infobits for Events? */
  194. return UA_STATUSCODE_GOOD;
  195. }
  196. UA_StatusCode
  197. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  198. if(mon->sampleCallbackIsRegistered)
  199. return UA_STATUSCODE_GOOD;
  200. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  201. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  202. return UA_STATUSCODE_GOOD;
  203. UA_StatusCode retval =
  204. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  205. mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
  206. if(retval == UA_STATUSCODE_GOOD)
  207. mon->sampleCallbackIsRegistered = true;
  208. return retval;
  209. }
  210. UA_StatusCode
  211. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  212. if(!mon->sampleCallbackIsRegistered)
  213. return UA_STATUSCODE_GOOD;
  214. mon->sampleCallbackIsRegistered = false;
  215. return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  216. }
  217. #endif /* UA_ENABLE_SUBSCRIPTIONS */