ua_subscription_monitoreditem.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2018 (c) Fabian Arndt, Root-Core
  10. */
  11. #include "ua_server_internal.h"
  12. #include "ua_subscription.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. /****************/
  15. /* Notification */
  16. /****************/
  17. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  18. static const UA_NodeId overflowEventType =
  19. {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}};
  20. static const UA_NodeId simpleOverflowEventType =
  21. {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE}};
  22. static UA_Boolean
  23. UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
  24. UA_MonitoredItem *mon = n->mon;
  25. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY)
  26. return false;
  27. UA_EventFieldList *efl = &n->data.event.fields;
  28. if(efl->eventFieldsSize == 1 &&
  29. efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
  30. isNodeInTree(&server->config.nodestore,
  31. (const UA_NodeId *)efl->eventFields[0].data,
  32. &overflowEventType, &subtypeId, 1)) {
  33. return true;
  34. }
  35. return false;
  36. }
  37. static UA_Notification *
  38. createEventOverflowNotification(UA_MonitoredItem *mon) {
  39. UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
  40. if(!overflowNotification)
  41. return NULL;
  42. overflowNotification->mon = mon;
  43. UA_EventFieldList_init(&overflowNotification->data.event.fields);
  44. overflowNotification->data.event.fields.eventFields = UA_Variant_new();
  45. if(!overflowNotification->data.event.fields.eventFields) {
  46. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  47. UA_free(overflowNotification);
  48. return NULL;
  49. }
  50. overflowNotification->data.event.fields.eventFieldsSize = 1;
  51. UA_StatusCode retval =
  52. UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
  53. &simpleOverflowEventType, &UA_TYPES[UA_TYPES_NODEID]);
  54. if(retval != UA_STATUSCODE_GOOD) {
  55. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  56. UA_free(overflowNotification);
  57. return NULL;
  58. }
  59. return overflowNotification;
  60. }
  61. #endif
  62. void
  63. UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
  64. UA_MonitoredItem *mon, UA_Notification *n) {
  65. /* Add to the MonitoredItem */
  66. TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
  67. ++mon->queueSize;
  68. /* Add to the subscription */
  69. TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
  70. ++sub->notificationQueueSize;
  71. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  72. ++sub->dataChangeNotifications;
  73. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  74. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  75. ++sub->eventNotifications;
  76. if(UA_Notification_isOverflowEvent(server, n))
  77. ++mon->eventOverflows;
  78. #endif
  79. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
  80. ++sub->statusChangeNotifications;
  81. }
  82. /* Ensure enough space is available in the MonitoredItem. Do this only after
  83. * adding the new Notification. */
  84. UA_MonitoredItem_ensureQueueSpace(server, mon);
  85. }
  86. void
  87. UA_Notification_dequeue(UA_Server *server, UA_Notification *n) {
  88. UA_MonitoredItem *mon = n->mon;
  89. UA_Subscription *sub = mon->subscription;
  90. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  91. --sub->dataChangeNotifications;
  92. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  93. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  94. --sub->eventNotifications;
  95. if(UA_Notification_isOverflowEvent(server, n))
  96. --mon->eventOverflows;
  97. #endif
  98. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
  99. --sub->statusChangeNotifications;
  100. }
  101. TAILQ_REMOVE(&mon->queue, n, listEntry);
  102. --mon->queueSize;
  103. TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
  104. --sub->notificationQueueSize;
  105. }
  106. void
  107. UA_Notification_delete(UA_Notification *n) {
  108. UA_MonitoredItem *mon = n->mon;
  109. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  110. UA_DataValue_deleteMembers(&n->data.value);
  111. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  112. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  113. UA_EventFieldList_deleteMembers(&n->data.event.fields);
  114. /* EventFilterResult currently isn't being used
  115. * UA_EventFilterResult_delete(notification->data.event->result); */
  116. #endif
  117. } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
  118. /* Nothing to do */
  119. }
  120. UA_free(n);
  121. }
  122. /*****************/
  123. /* MonitoredItem */
  124. /*****************/
  125. void
  126. UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  127. memset(mon, 0, sizeof(UA_MonitoredItem));
  128. mon->subscription = sub;
  129. TAILQ_INIT(&mon->queue);
  130. }
  131. void
  132. UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  133. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  134. /* Remove the sampling callback */
  135. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  136. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  137. /* TODO: Access val data.event */
  138. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
  139. "MonitoredItemTypes other than ChangeNotify or EventNotify "
  140. "are not supported yet");
  141. }
  142. /* Remove the queued notifications if attached to a subscription (not a
  143. * local MonitoredItem) */
  144. if(monitoredItem->subscription) {
  145. UA_Notification *notification, *notification_tmp;
  146. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  147. listEntry, notification_tmp) {
  148. /* Remove the item from the queues and free the memory */
  149. UA_Notification_dequeue(server, notification);
  150. UA_Notification_delete(notification);
  151. }
  152. }
  153. /* if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  154. * -> UA_DataChangeFilter does not hold dynamic content we need to free */
  155. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  156. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  157. /* Remove the monitored item from the node queue */
  158. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  159. UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
  160. /* Delete the event filter */
  161. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  162. }
  163. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  164. /* Deregister MonitoredItem in userland */
  165. if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
  166. /* Get the session context. Local MonitoredItems don't have a subscription. */
  167. UA_Session *session = NULL;
  168. if(monitoredItem->subscription)
  169. session = monitoredItem->subscription->session;
  170. if(!session)
  171. session = &server->adminSession;
  172. /* Get the node context */
  173. void *targetContext = NULL;
  174. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
  175. /* Deregister */
  176. server->config.monitoredItemRegisterCallback(server, &session->sessionId,
  177. session->sessionHandle, &monitoredItem->monitoredNodeId,
  178. targetContext, monitoredItem->attributeId, true);
  179. }
  180. /* Remove the monitored item */
  181. if(monitoredItem->listEntry.le_prev != NULL)
  182. LIST_REMOVE(monitoredItem, listEntry);
  183. UA_String_deleteMembers(&monitoredItem->indexRange);
  184. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  185. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  186. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  187. /* No actual callback, just remove the structure */
  188. monitoredItem->delayedFreePointers.callback = NULL;
  189. UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
  190. }
  191. #ifdef __clang_analyzer__
  192. # define UA_CA_assert(clause) UA_assert(clause)
  193. #else
  194. # define UA_CA_assert(clause)
  195. #endif
  196. UA_StatusCode
  197. UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  198. if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
  199. return UA_STATUSCODE_GOOD;
  200. /* Remove notifications until the queue size is reached */
  201. UA_Subscription *sub = mon->subscription;
  202. #ifdef __clang_analyzer__
  203. UA_Notification *last = NULL;
  204. #endif
  205. while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
  206. /* At least two notifications that are not eventOverflows in the queue */
  207. UA_assert(mon->queueSize - mon->eventOverflows >= 2);
  208. /* Select the next notification to delete. Skip over overflow events. */
  209. UA_Notification *del;
  210. if(mon->discardOldest) {
  211. /* Remove the oldest */
  212. del = TAILQ_FIRST(&mon->queue);
  213. UA_CA_assert(del != last);
  214. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  215. while(UA_Notification_isOverflowEvent(server, del)) {
  216. del = TAILQ_NEXT(del, listEntry); /* skip overflow events */
  217. UA_CA_assert(del != last);
  218. }
  219. #endif
  220. } else {
  221. /* Remove the second newest (to keep the up-to-date notification) */
  222. del = TAILQ_LAST(&mon->queue, NotificationQueue);
  223. del = TAILQ_PREV(del, NotificationQueue, listEntry);
  224. UA_CA_assert(del != last);
  225. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  226. while(UA_Notification_isOverflowEvent(server, del)) {
  227. del = TAILQ_PREV(del, NotificationQueue, listEntry); /* skip overflow events */
  228. UA_CA_assert(del != last);
  229. }
  230. #endif
  231. }
  232. UA_assert(del && del->mon == mon);
  233. /* Move after_del right after del in the global queue. (It is already
  234. * right after del in the per-MonitoredItem queue.) This is required so
  235. * we don't starve MonitoredItems with a high sampling interval by
  236. * always removing their first appearance in the gloal queue for the
  237. * Subscription. */
  238. UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
  239. UA_CA_assert(after_del != last);
  240. if(after_del) {
  241. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  242. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  243. }
  244. #ifdef __clang_analyzer__
  245. last = del;
  246. #endif
  247. /* Delete the notification */
  248. UA_Notification_dequeue(server, del);
  249. UA_Notification_delete(del);
  250. }
  251. /* Get the element where the overflow shall be announced (infobits or
  252. * overflowevent) */
  253. UA_Notification *indicator;
  254. if(mon->discardOldest)
  255. indicator = TAILQ_FIRST(&mon->queue);
  256. else
  257. indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
  258. UA_assert(indicator);
  259. UA_CA_assert(indicator != last);
  260. /* Create an overflow notification */
  261. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  262. /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
  263. * "is generated when the first Event has to be discarded [...] without discarding
  264. * any other event". So only generate one for all deleted events. */
  265. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  266. /* Avoid two redundant overflow events in a row */
  267. if(UA_Notification_isOverflowEvent(server, indicator)) {
  268. if(mon->discardOldest)
  269. return UA_STATUSCODE_GOOD;
  270. UA_Notification *prev = TAILQ_PREV(indicator, NotificationQueue, listEntry);
  271. UA_CA_assert(prev != last);
  272. if(prev && UA_Notification_isOverflowEvent(server, prev))
  273. return UA_STATUSCODE_GOOD;
  274. }
  275. /* A notification is inserted into the queue which includes only the
  276. * NodeId of the overflowEventType. It is up to the client to check for
  277. * possible overflows. */
  278. UA_Notification *overflowNotification = createEventOverflowNotification(mon);
  279. if(!overflowNotification)
  280. return UA_STATUSCODE_BADOUTOFMEMORY;
  281. /* Insert before the "indicator notification". This is either first in
  282. * the queue (if the oldest notification was removed) or before the new
  283. * event that remains the last element of the queue. */
  284. TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry);
  285. TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
  286. ++mon->eventOverflows;
  287. ++mon->queueSize;
  288. ++sub->notificationQueueSize;
  289. ++sub->eventNotifications;
  290. }
  291. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  292. /* Set the infobits of a datachange notification */
  293. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  294. /* Set the infobits */
  295. if(mon->maxQueueSize > 1) {
  296. /* Add the infobits either to the newest or the new last entry */
  297. indicator->data.value.hasStatus = true;
  298. indicator->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  299. UA_STATUSCODE_INFOBITS_OVERFLOW);
  300. } else {
  301. /* If the queue size is reduced to one, remove the infobits */
  302. indicator->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  303. UA_STATUSCODE_INFOBITS_OVERFLOW);
  304. }
  305. }
  306. return UA_STATUSCODE_GOOD;
  307. }
  308. UA_StatusCode
  309. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  310. if(mon->sampleCallbackIsRegistered)
  311. return UA_STATUSCODE_GOOD;
  312. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  313. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  314. return UA_STATUSCODE_GOOD;
  315. UA_StatusCode retval =
  316. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  317. mon, mon->samplingInterval, &mon->sampleCallbackId);
  318. if(retval == UA_STATUSCODE_GOOD)
  319. mon->sampleCallbackIsRegistered = true;
  320. return retval;
  321. }
  322. void
  323. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  324. if(!mon->sampleCallbackIsRegistered)
  325. return;
  326. UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  327. mon->sampleCallbackIsRegistered = false;
  328. }
  329. #endif /* UA_ENABLE_SUBSCRIPTIONS */