ua_subscription_datachange.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. */
  10. #include "ua_server_internal.h"
  11. #include "ua_subscription.h"
  12. #include "ua_types_encoding_binary.h"
  13. #include "ua_subscription_events.h"
  14. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  15. #define UA_VALUENCODING_MAXSTACK 512
  16. void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  17. memset(mon, 0, sizeof(UA_MonitoredItem));
  18. mon->subscription = sub;
  19. TAILQ_INIT(&mon->queue);
  20. }
  21. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  22. static UA_StatusCode removeMonitoredItemFromNodeCallback(UA_Server *server, UA_Session *session, UA_Node *node,
  23. void *data) {
  24. /* data is the monitoredItemID */
  25. /* catch edge case that it's the first element */
  26. if (data == ((UA_ObjectNode *) node)->monitoredItemQueue) {
  27. ((UA_ObjectNode *)node)->monitoredItemQueue = ((UA_MonitoredItem *)data)->next;
  28. return UA_STATUSCODE_GOOD;
  29. }
  30. /* SLIST_FOREACH */
  31. for (UA_MonitoredItem *entry = ((UA_ObjectNode *) node)->monitoredItemQueue->next;
  32. entry != NULL; entry=entry->next) {
  33. if (entry == (UA_MonitoredItem *)data) {
  34. /* SLIST_REMOVE */
  35. UA_MonitoredItem *iter = ((UA_ObjectNode *) node)->monitoredItemQueue;
  36. for (; iter->next != entry; iter=iter->next) {}
  37. iter->next = entry->next;
  38. UA_free(entry);
  39. break;
  40. }
  41. }
  42. return UA_STATUSCODE_GOOD;
  43. }
  44. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  45. void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  46. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  47. /* Remove the sampling callback */
  48. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  49. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  50. /* TODO: Access val data.event */
  51. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  52. "MonitoredItemTypes other than ChangeNotify or EventNotify are not supported yet");
  53. }
  54. /* Remove the queued notifications if attached to a subscription */
  55. if(monitoredItem->subscription) {
  56. UA_Subscription *sub = monitoredItem->subscription;
  57. UA_Notification *notification, *notification_tmp;
  58. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  59. listEntry, notification_tmp) {
  60. /* Remove the item from the queues */
  61. TAILQ_REMOVE(&monitoredItem->queue, notification, listEntry);
  62. TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
  63. --sub->notificationQueueSize;
  64. /*
  65. if (monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  66. EventFilterResult currently isn't being used
  67. UA_EventFilterResult_delete(notification->data.event->result);
  68. }
  69. */
  70. UA_Notification_delete(notification);
  71. }
  72. monitoredItem->queueSize = 0;
  73. }
  74. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  75. if (monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  76. /* Remove the monitored item from the node queue */
  77. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId, removeMonitoredItemFromNodeCallback,
  78. monitoredItem);
  79. /* Delete the event filter */
  80. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  81. }
  82. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  83. /* Remove the monitored item */
  84. UA_String_deleteMembers(&monitoredItem->indexRange);
  85. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  86. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  87. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  88. UA_Server_delayedFree(server, monitoredItem);
  89. }
  90. UA_StatusCode MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  91. if(mon->queueSize <= mon->maxQueueSize)
  92. return UA_STATUSCODE_GOOD;
  93. /* Remove notifications until the queue size is reached */
  94. UA_Subscription *sub = mon->subscription;
  95. while(mon->queueSize > mon->maxQueueSize) {
  96. UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
  97. /* Make sure that the MonitoredItem does not lose its place in the
  98. * global queue when notifications are removed. Otherwise the
  99. * MonitoredItem can "starve" itself by putting new notifications always
  100. * at the end of the global queue and removing the old ones.
  101. *
  102. * - If the oldest notification is removed, put the second oldest
  103. * notification right behind it.
  104. * - If the newest notification is removed, put the new notification
  105. * right behind it. */
  106. UA_Notification *del; /* The notification that will be deleted */
  107. UA_Notification *after_del; /* The notification to keep and move after del */
  108. if(mon->discardOldest) {
  109. /* Remove the oldest */
  110. del = TAILQ_FIRST(&mon->queue);
  111. after_del = TAILQ_NEXT(del, listEntry);
  112. } else {
  113. /* Remove the second newest (to keep the up-to-date notification) */
  114. after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
  115. del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
  116. }
  117. /* Move after_del right after del in the global queue */
  118. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  119. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  120. /* Remove the notification from the queues */
  121. TAILQ_REMOVE(&mon->queue, del, listEntry);
  122. TAILQ_REMOVE(&sub->notificationQueue, del, globalEntry);
  123. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  124. /* TODO: provide additional protection for overflowEvents according to specification */
  125. /* removing an overflowEvent should not reduce the queueSize */
  126. UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
  127. if (!(del->data.event.fields.eventFieldsSize == 1
  128. && del->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID]
  129. && UA_NodeId_equal((UA_NodeId *)del->data.event.fields.eventFields->data, &overflowId))) {
  130. --mon->queueSize;
  131. --sub->notificationQueueSize;
  132. }
  133. #else
  134. --mon->queueSize;
  135. --sub->notificationQueueSize;
  136. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  137. /* Free the notification */
  138. if (mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  139. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  140. /* EventFilterResult currently isn't being used
  141. UA_EventFilterResult_deleteMembers(&del->data.event->result); */
  142. UA_EventFieldList_deleteMembers(&del->data.event.fields);
  143. /* cause an overflowEvent */
  144. /* an overflowEvent does not care about event filters and as such will not be "triggered" correctly.
  145. * Instead, a notification will be inserted into the queue which includes only the nodeId of the
  146. * overflowEventType. It is up to the client to check for possible overflows.
  147. */
  148. UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
  149. if (!overflowNotification) {
  150. return UA_STATUSCODE_BADOUTOFMEMORY;
  151. }
  152. UA_EventFieldList_init(&overflowNotification->data.event.fields);
  153. overflowNotification->data.event.fields.eventFields = UA_Variant_new();
  154. if (!overflowNotification->data.event.fields.eventFields) {
  155. UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
  156. UA_free(overflowNotification);
  157. return UA_STATUSCODE_BADOUTOFMEMORY;
  158. }
  159. UA_Variant_init(overflowNotification->data.event.fields.eventFields);
  160. overflowNotification->data.event.fields.eventFieldsSize = 1;
  161. UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
  162. &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
  163. overflowNotification->mon = mon;
  164. if (mon->discardOldest) {
  165. TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
  166. TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
  167. } else {
  168. TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
  169. TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
  170. }
  171. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  172. }
  173. UA_Notification_delete(del);
  174. }
  175. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  176. /* Get the element that carries the infobits */
  177. UA_Notification *notification = NULL;
  178. if(mon->discardOldest)
  179. notification = TAILQ_FIRST(&mon->queue);
  180. else
  181. notification = TAILQ_LAST(&mon->queue, NotificationQueue);
  182. UA_assert(notification);
  183. if(mon->maxQueueSize > 1) {
  184. /* Add the infobits either to the newest or the new last entry */
  185. notification->data.value.hasStatus = true;
  186. notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  187. UA_STATUSCODE_INFOBITS_OVERFLOW);
  188. } else {
  189. /* If the queue size is reduced to one, remove the infobits */
  190. notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  191. UA_STATUSCODE_INFOBITS_OVERFLOW);
  192. }
  193. }
  194. /* TODO: Infobits for Events? */
  195. return UA_STATUSCODE_GOOD;
  196. }
  197. #define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
  198. static UA_INLINE UA_Boolean
  199. outOfDeadBand(const void *data1, const void *data2, const size_t index, const UA_DataType *type, const UA_Double deadbandValue) {
  200. if (type == &UA_TYPES[UA_TYPES_SBYTE]) {
  201. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_SByte*)data1)[index], ((const UA_SByte*)data2)[index]) <= deadbandValue)
  202. return false;
  203. } else
  204. if (type == &UA_TYPES[UA_TYPES_BYTE]) {
  205. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Byte*)data1)[index], ((const UA_Byte*)data2)[index]) <= deadbandValue)
  206. return false;
  207. } else
  208. if (type == &UA_TYPES[UA_TYPES_INT16]) {
  209. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int16*)data1)[index], ((const UA_Int16*)data2)[index]) <= deadbandValue)
  210. return false;
  211. } else
  212. if (type == &UA_TYPES[UA_TYPES_UINT16]) {
  213. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt16*)data1)[index], ((const UA_UInt16*)data2)[index]) <= deadbandValue)
  214. return false;
  215. } else
  216. if (type == &UA_TYPES[UA_TYPES_INT32]) {
  217. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int32*)data1)[index], ((const UA_Int32*)data2)[index]) <= deadbandValue)
  218. return false;
  219. } else
  220. if (type == &UA_TYPES[UA_TYPES_UINT32]) {
  221. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt32*)data1)[index], ((const UA_UInt32*)data2)[index]) <= deadbandValue)
  222. return false;
  223. } else
  224. if (type == &UA_TYPES[UA_TYPES_INT64]) {
  225. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int64*)data1)[index], ((const UA_Int64*)data2)[index]) <= deadbandValue)
  226. return false;
  227. } else
  228. if (type == &UA_TYPES[UA_TYPES_UINT64]) {
  229. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt64*)data1)[index], ((const UA_UInt64*)data2)[index]) <= deadbandValue)
  230. return false;
  231. } else
  232. if (type == &UA_TYPES[UA_TYPES_FLOAT]) {
  233. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Float*)data1)[index], ((const UA_Float*)data2)[index]) <= deadbandValue)
  234. return false;
  235. } else
  236. if (type == &UA_TYPES[UA_TYPES_DOUBLE]) {
  237. if (ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Double*)data1)[index], ((const UA_Double*)data2)[index]) <= deadbandValue)
  238. return false;
  239. }
  240. return true;
  241. }
  242. static UA_INLINE UA_Boolean
  243. updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue, const UA_Double deadbandValue) {
  244. if (value->arrayLength != oldValue->arrayLength) {
  245. return true;
  246. }
  247. if (value->type != oldValue->type) {
  248. return true;
  249. }
  250. if (UA_Variant_isScalar(value)) {
  251. return outOfDeadBand(value->data, oldValue->data, 0, value->type, deadbandValue);
  252. } else {
  253. for (size_t i = 0; i < value->arrayLength; ++i) {
  254. if (outOfDeadBand(value->data, oldValue->data, i, value->type, deadbandValue))
  255. return true;
  256. }
  257. }
  258. return false;
  259. }
  260. /* When a change is detected, encoding contains the heap-allocated binary encoded value */
  261. static UA_Boolean
  262. detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
  263. UA_ByteString *encoding) {
  264. UA_Session *session = &adminSession;
  265. UA_UInt32 subscriptionId = 0;
  266. UA_Subscription *sub = mon->subscription;
  267. if(sub) {
  268. session = sub->session;
  269. subscriptionId = sub->subscriptionId;
  270. }
  271. if(isDataTypeNumeric(value->value.type) &&
  272. (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
  273. mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
  274. if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
  275. if(!updateNeededForFilteredValue(&value->value, &mon->lastValue, mon->filter.dataChangeFilter.deadbandValue))
  276. return false;
  277. }
  278. /* else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
  279. // TODO where do this EURange come from ?
  280. UA_Double deadbandValue = fabs(mon->filter.deadbandValue * (EURange.high-EURange.low));
  281. if (!updateNeededForFilteredValue(value->value, mon->lastValue, deadbandValue))
  282. return false;
  283. }*/
  284. }
  285. /* Stack-allocate some memory for the value encoding. We might heap-allocate
  286. * more memory if needed. This is just enough for scalars and small
  287. * structures. */
  288. UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
  289. UA_ByteString valueEncoding;
  290. valueEncoding.data = stackValueEncoding;
  291. valueEncoding.length = UA_VALUENCODING_MAXSTACK;
  292. /* Encode the value */
  293. UA_Byte *bufPos = valueEncoding.data;
  294. const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
  295. UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  296. &bufPos, &bufEnd, NULL, NULL);
  297. if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED) {
  298. size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
  299. if(binsize == 0)
  300. return false;
  301. retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
  302. if(retval == UA_STATUSCODE_GOOD) {
  303. bufPos = valueEncoding.data;
  304. bufEnd = &valueEncoding.data[valueEncoding.length];
  305. retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  306. &bufPos, &bufEnd, NULL, NULL);
  307. }
  308. }
  309. if(retval != UA_STATUSCODE_GOOD) {
  310. UA_LOG_WARNING_SESSION(server->config.logger, session,
  311. "Subscription %u | MonitoredItem %i | "
  312. "Could not encode the value the MonitoredItem with status %s",
  313. subscriptionId, mon->monitoredItemId, UA_StatusCode_name(retval));
  314. return false;
  315. }
  316. /* Has the value changed? */
  317. valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
  318. UA_Boolean changed = (!mon->lastSampledValue.data ||
  319. !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
  320. /* No change */
  321. if(!changed) {
  322. if(valueEncoding.data != stackValueEncoding)
  323. UA_ByteString_deleteMembers(&valueEncoding);
  324. return false;
  325. }
  326. /* Change detected. Copy encoding on the heap if necessary. */
  327. if(valueEncoding.data == stackValueEncoding) {
  328. retval = UA_ByteString_copy(&valueEncoding, encoding);
  329. if(retval != UA_STATUSCODE_GOOD) {
  330. UA_LOG_WARNING_SESSION(server->config.logger, session,
  331. "Subscription %u | MonitoredItem %i | "
  332. "Detected change, but could not allocate memory for the notification"
  333. "with status %s", subscriptionId, mon->monitoredItemId,
  334. UA_StatusCode_name(retval));
  335. return false;
  336. }
  337. return true;
  338. }
  339. *encoding = valueEncoding;
  340. return true;
  341. }
  342. /* Has this sample changed from the last one? The method may allocate additional
  343. * space for the encoding buffer. Detect the change in encoding->data. */
  344. static UA_Boolean
  345. detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
  346. UA_DataValue value, UA_ByteString *encoding) {
  347. /* Apply Filter */
  348. if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
  349. value.hasValue = false;
  350. value.hasServerTimestamp = false;
  351. value.hasServerPicoseconds = false;
  352. if(mon->filter.dataChangeFilter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
  353. value.hasSourceTimestamp = false;
  354. value.hasSourcePicoseconds = false;
  355. }
  356. /* Detect the value change */
  357. return detectValueChangeWithFilter(server, mon, &value, encoding);
  358. }
  359. /* Returns whether the sample was stored in the MonitoredItem */
  360. static UA_Boolean
  361. sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
  362. UA_DataValue *value) {
  363. UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
  364. UA_Subscription *sub = monitoredItem->subscription;
  365. /* Contains heap-allocated binary encoding of the value if a change was detected */
  366. UA_ByteString binaryEncoding = UA_BYTESTRING_NULL;
  367. /* Has the value changed? Allocates memory in binaryEncoding if necessary.
  368. * value is edited internally so we make a shallow copy. */
  369. UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binaryEncoding);
  370. if(!changed)
  371. return false;
  372. UA_Boolean storedValue = false;
  373. if(sub) {
  374. /* Allocate a new notification */
  375. UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
  376. if(!newNotification) {
  377. UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
  378. "Subscription %u | MonitoredItem %i | "
  379. "Item for the publishing queue could not be allocated",
  380. sub->subscriptionId, monitoredItem->monitoredItemId);
  381. UA_ByteString_deleteMembers(&binaryEncoding);
  382. return false;
  383. }
  384. /* <-- Point of no return --> */
  385. newNotification->mon = monitoredItem;
  386. newNotification->data.value = *value; /* Move the value to the notification */
  387. storedValue = true;
  388. /* Add the notification to the end of local and global queue */
  389. TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
  390. TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
  391. ++monitoredItem->queueSize;
  392. ++sub->notificationQueueSize;
  393. /* Remove some notifications if the queue is beyond maximum capacity */
  394. MonitoredItem_ensureQueueSpace(server, monitoredItem);
  395. } else {
  396. /* Call the local callback if not attached to a subscription */
  397. UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
  398. void *nodeContext = NULL;
  399. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
  400. localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
  401. localMon->context,
  402. &monitoredItem->monitoredNodeId,
  403. nodeContext, monitoredItem->attributeId,
  404. value);
  405. }
  406. /* Store the encoding for comparison */
  407. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  408. monitoredItem->lastSampledValue = binaryEncoding;
  409. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  410. UA_Variant_copy(&value->value, &monitoredItem->lastValue);
  411. return storedValue;
  412. }
  413. void
  414. UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  415. UA_Session *session = &adminSession;
  416. UA_UInt32 subscriptionId = 0;
  417. UA_Subscription *sub = monitoredItem->subscription;
  418. if(sub) {
  419. session = sub->session;
  420. subscriptionId = sub->subscriptionId;
  421. }
  422. if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  423. UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
  424. "MonitoredItem %i | Not a data change notification",
  425. subscriptionId, monitoredItem->monitoredItemId);
  426. return;
  427. }
  428. /* Sample the value */
  429. UA_ReadValueId rvid;
  430. UA_ReadValueId_init(&rvid);
  431. rvid.nodeId = monitoredItem->monitoredNodeId;
  432. rvid.attributeId = monitoredItem->attributeId;
  433. rvid.indexRange = monitoredItem->indexRange;
  434. UA_DataValue value = UA_Server_readWithSession(server, session, &rvid, monitoredItem->timestampsToReturn);
  435. /* Operate on the sample */
  436. UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
  437. /* Delete the sample if it was not stored in the MonitoredItem */
  438. if(!storedValue)
  439. UA_DataValue_deleteMembers(&value);
  440. }
  441. UA_StatusCode
  442. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  443. if(mon->sampleCallbackIsRegistered)
  444. return UA_STATUSCODE_GOOD;
  445. UA_StatusCode retval =
  446. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  447. mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
  448. if(retval == UA_STATUSCODE_GOOD)
  449. mon->sampleCallbackIsRegistered = true;
  450. return retval;
  451. }
  452. UA_StatusCode
  453. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  454. if(!mon->sampleCallbackIsRegistered)
  455. return UA_STATUSCODE_GOOD;
  456. mon->sampleCallbackIsRegistered = false;
  457. return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  458. }
  459. #endif /* UA_ENABLE_SUBSCRIPTIONS */