ua_subscription_datachange.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. */
  10. #include "ua_server_internal.h"
  11. #include "ua_subscription.h"
  12. #include "ua_types_encoding_binary.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. #define UA_VALUENCODING_MAXSTACK 512
  15. void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  16. memset(mon, 0, sizeof(UA_MonitoredItem));
  17. mon->subscription = sub;
  18. TAILQ_INIT(&mon->queue);
  19. }
  20. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  21. static UA_StatusCode
  22. removeMonitoredItemFromNodeCallback(UA_Server *server, UA_Session *session,
  23. UA_Node *node, void *data) {
  24. /* data is the monitoredItemID */
  25. /* catch edge case that it's the first element */
  26. if (data == ((UA_ObjectNode *) node)->monitoredItemQueue) {
  27. ((UA_ObjectNode *)node)->monitoredItemQueue = ((UA_MonitoredItem *)data)->next;
  28. return UA_STATUSCODE_GOOD;
  29. }
  30. /* SLIST_FOREACH */
  31. for (UA_MonitoredItem *entry = ((UA_ObjectNode *) node)->monitoredItemQueue->next;
  32. entry != NULL; entry=entry->next) {
  33. if (entry == (UA_MonitoredItem *)data) {
  34. /* SLIST_REMOVE */
  35. UA_MonitoredItem *iter = ((UA_ObjectNode *) node)->monitoredItemQueue;
  36. for (; iter->next != entry; iter=iter->next) {}
  37. iter->next = entry->next;
  38. UA_free(entry);
  39. break;
  40. }
  41. }
  42. return UA_STATUSCODE_GOOD;
  43. }
  44. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  45. void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  46. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  47. /* Remove the sampling callback */
  48. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  49. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  50. /* TODO: Access val data.event */
  51. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  52. "MonitoredItemTypes other than ChangeNotify or EventNotify are not supported yet");
  53. }
  54. /* Remove the queued notifications if attached to a subscription */
  55. if(monitoredItem->subscription) {
  56. UA_Subscription *sub = monitoredItem->subscription;
  57. UA_Notification *notification, *notification_tmp;
  58. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  59. listEntry, notification_tmp) {
  60. /* Remove the item from the queues and free the memory */
  61. UA_Notification_delete(sub, monitoredItem, notification);
  62. }
  63. }
  64. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  65. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  66. /* Remove the monitored item from the node queue */
  67. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  68. removeMonitoredItemFromNodeCallback, monitoredItem);
  69. /* Delete the event filter */
  70. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  71. }
  72. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  73. /* Remove the monitored item */
  74. if(monitoredItem->listEntry.le_prev != NULL)
  75. LIST_REMOVE(monitoredItem, listEntry);
  76. UA_String_deleteMembers(&monitoredItem->indexRange);
  77. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  78. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  79. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  80. UA_Server_delayedFree(server, monitoredItem);
  81. }
  82. UA_StatusCode
  83. MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  84. if(mon->queueSize <= mon->maxQueueSize)
  85. return UA_STATUSCODE_GOOD;
  86. /* Remove notifications until the queue size is reached */
  87. UA_Subscription *sub = mon->subscription;
  88. while(mon->queueSize > mon->maxQueueSize) {
  89. UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
  90. /* Make sure that the MonitoredItem does not lose its place in the
  91. * global queue when notifications are removed. Otherwise the
  92. * MonitoredItem can "starve" itself by putting new notifications always
  93. * at the end of the global queue and removing the old ones.
  94. *
  95. * - If the oldest notification is removed, put the second oldest
  96. * notification right behind it.
  97. * - If the newest notification is removed, put the new notification
  98. * right behind it. */
  99. UA_Notification *del; /* The notification that will be deleted */
  100. UA_Notification *after_del; /* The notification to keep and move after del */
  101. if(mon->discardOldest) {
  102. /* Remove the oldest */
  103. del = TAILQ_FIRST(&mon->queue);
  104. after_del = TAILQ_NEXT(del, listEntry);
  105. } else {
  106. /* Remove the second newest (to keep the up-to-date notification) */
  107. after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
  108. del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
  109. }
  110. /* Move after_del right after del in the global queue */
  111. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  112. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  113. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  114. /* Create an overflow notification */
  115. /* if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) { */
  116. /* /\* EventFilterResult currently isn't being used */
  117. /* UA_EventFilterResult_deleteMembers(&del->data.event->result); *\/ */
  118. /* UA_EventFieldList_deleteMembers(&del->data.event.fields); */
  119. /* /\* cause an overflowEvent *\/ */
  120. /* /\* an overflowEvent does not care about event filters and as such */
  121. /* * will not be "triggered" correctly. Instead, a notification will */
  122. /* * be inserted into the queue which includes only the nodeId of the */
  123. /* * overflowEventType. It is up to the client to check for possible */
  124. /* * overflows. *\/ */
  125. /* UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification)); */
  126. /* if(!overflowNotification) */
  127. /* return UA_STATUSCODE_BADOUTOFMEMORY; */
  128. /* UA_EventFieldList_init(&overflowNotification->data.event.fields); */
  129. /* overflowNotification->data.event.fields.eventFields = UA_Variant_new(); */
  130. /* if(!overflowNotification->data.event.fields.eventFields) { */
  131. /* UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields); */
  132. /* UA_free(overflowNotification); */
  133. /* return UA_STATUSCODE_BADOUTOFMEMORY; */
  134. /* } */
  135. /* UA_Variant_init(overflowNotification->data.event.fields.eventFields); */
  136. /* overflowNotification->data.event.fields.eventFieldsSize = 1; */
  137. /* UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
  138. /* UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields, */
  139. /* &overflowId, &UA_TYPES[UA_TYPES_NODEID]); */
  140. /* overflowNotification->mon = mon; */
  141. /* if(mon->discardOldest) { */
  142. /* TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry); */
  143. /* TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
  144. /* } else { */
  145. /* TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry); */
  146. /* TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
  147. /* } */
  148. /* ++mon->queueSize; */
  149. /* ++sub->notificationQueueSize; */
  150. /* ++sub->eventNotifications; */
  151. /* } */
  152. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  153. /* Delete the notification. This also removes the notification from the
  154. * linked lists. */
  155. UA_Notification_delete(sub, mon, del);
  156. }
  157. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  158. /* Get the element that carries the infobits */
  159. UA_Notification *notification = NULL;
  160. if(mon->discardOldest)
  161. notification = TAILQ_FIRST(&mon->queue);
  162. else
  163. notification = TAILQ_LAST(&mon->queue, NotificationQueue);
  164. UA_assert(notification);
  165. if(mon->maxQueueSize > 1) {
  166. /* Add the infobits either to the newest or the new last entry */
  167. notification->data.value.hasStatus = true;
  168. notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  169. UA_STATUSCODE_INFOBITS_OVERFLOW);
  170. } else {
  171. /* If the queue size is reduced to one, remove the infobits */
  172. notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  173. UA_STATUSCODE_INFOBITS_OVERFLOW);
  174. }
  175. }
  176. /* TODO: Infobits for Events? */
  177. return UA_STATUSCODE_GOOD;
  178. }
  179. #define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
  180. static UA_INLINE UA_Boolean
  181. outOfDeadBand(const void *data1, const void *data2, const size_t index,
  182. const UA_DataType *type, const UA_Double deadbandValue) {
  183. if(type == &UA_TYPES[UA_TYPES_SBYTE]) {
  184. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_SByte*)data1)[index],
  185. ((const UA_SByte*)data2)[index]) <= deadbandValue)
  186. return false;
  187. } else if(type == &UA_TYPES[UA_TYPES_BYTE]) {
  188. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Byte*)data1)[index],
  189. ((const UA_Byte*)data2)[index]) <= deadbandValue)
  190. return false;
  191. } else if(type == &UA_TYPES[UA_TYPES_INT16]) {
  192. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int16*)data1)[index],
  193. ((const UA_Int16*)data2)[index]) <= deadbandValue)
  194. return false;
  195. } else if(type == &UA_TYPES[UA_TYPES_UINT16]) {
  196. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt16*)data1)[index],
  197. ((const UA_UInt16*)data2)[index]) <= deadbandValue)
  198. return false;
  199. } else if(type == &UA_TYPES[UA_TYPES_INT32]) {
  200. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int32*)data1)[index],
  201. ((const UA_Int32*)data2)[index]) <= deadbandValue)
  202. return false;
  203. } else if(type == &UA_TYPES[UA_TYPES_UINT32]) {
  204. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt32*)data1)[index],
  205. ((const UA_UInt32*)data2)[index]) <= deadbandValue)
  206. return false;
  207. } else if(type == &UA_TYPES[UA_TYPES_INT64]) {
  208. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int64*)data1)[index],
  209. ((const UA_Int64*)data2)[index]) <= deadbandValue)
  210. return false;
  211. } else if(type == &UA_TYPES[UA_TYPES_UINT64]) {
  212. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt64*)data1)[index],
  213. ((const UA_UInt64*)data2)[index]) <= deadbandValue)
  214. return false;
  215. } else if(type == &UA_TYPES[UA_TYPES_FLOAT]) {
  216. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Float*)data1)[index],
  217. ((const UA_Float*)data2)[index]) <= deadbandValue)
  218. return false;
  219. } else if(type == &UA_TYPES[UA_TYPES_DOUBLE]) {
  220. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Double*)data1)[index],
  221. ((const UA_Double*)data2)[index]) <= deadbandValue)
  222. return false;
  223. }
  224. return true;
  225. }
  226. static UA_INLINE UA_Boolean
  227. updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue,
  228. const UA_Double deadbandValue) {
  229. if(value->arrayLength != oldValue->arrayLength)
  230. return true;
  231. if(value->type != oldValue->type)
  232. return true;
  233. if (UA_Variant_isScalar(value)) {
  234. return outOfDeadBand(value->data, oldValue->data, 0, value->type, deadbandValue);
  235. } else {
  236. for (size_t i = 0; i < value->arrayLength; ++i) {
  237. if (outOfDeadBand(value->data, oldValue->data, i, value->type, deadbandValue))
  238. return true;
  239. }
  240. }
  241. return false;
  242. }
  243. /* When a change is detected, encoding contains the heap-allocated binary encoded value */
  244. static UA_Boolean
  245. detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
  246. UA_ByteString *encoding) {
  247. UA_Session *session = &adminSession;
  248. UA_UInt32 subscriptionId = 0;
  249. UA_Subscription *sub = mon->subscription;
  250. if(sub) {
  251. session = sub->session;
  252. subscriptionId = sub->subscriptionId;
  253. }
  254. if(isDataTypeNumeric(value->value.type) &&
  255. (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
  256. mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
  257. if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
  258. if(!updateNeededForFilteredValue(&value->value, &mon->lastValue,
  259. mon->filter.dataChangeFilter.deadbandValue))
  260. return false;
  261. }
  262. /* else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
  263. // TODO where do this EURange come from ?
  264. UA_Double deadbandValue = fabs(mon->filter.deadbandValue * (EURange.high-EURange.low));
  265. if (!updateNeededForFilteredValue(value->value, mon->lastValue, deadbandValue))
  266. return false;
  267. }*/
  268. }
  269. /* Stack-allocate some memory for the value encoding. We might heap-allocate
  270. * more memory if needed. This is just enough for scalars and small
  271. * structures. */
  272. UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
  273. UA_ByteString valueEncoding;
  274. valueEncoding.data = stackValueEncoding;
  275. valueEncoding.length = UA_VALUENCODING_MAXSTACK;
  276. /* Encode the value */
  277. UA_Byte *bufPos = valueEncoding.data;
  278. const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
  279. UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  280. &bufPos, &bufEnd, NULL, NULL);
  281. if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED) {
  282. size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
  283. if(binsize == 0)
  284. return false;
  285. retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
  286. if(retval == UA_STATUSCODE_GOOD) {
  287. bufPos = valueEncoding.data;
  288. bufEnd = &valueEncoding.data[valueEncoding.length];
  289. retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  290. &bufPos, &bufEnd, NULL, NULL);
  291. }
  292. }
  293. if(retval != UA_STATUSCODE_GOOD) {
  294. UA_LOG_WARNING_SESSION(server->config.logger, session,
  295. "Subscription %u | MonitoredItem %i | "
  296. "Could not encode the value the MonitoredItem with status %s",
  297. subscriptionId, mon->monitoredItemId, UA_StatusCode_name(retval));
  298. return false;
  299. }
  300. /* Has the value changed? */
  301. valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
  302. UA_Boolean changed = (!mon->lastSampledValue.data ||
  303. !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
  304. /* No change */
  305. if(!changed) {
  306. if(valueEncoding.data != stackValueEncoding)
  307. UA_ByteString_deleteMembers(&valueEncoding);
  308. return false;
  309. }
  310. /* Change detected. Copy encoding on the heap if necessary. */
  311. if(valueEncoding.data == stackValueEncoding) {
  312. retval = UA_ByteString_copy(&valueEncoding, encoding);
  313. if(retval != UA_STATUSCODE_GOOD) {
  314. UA_LOG_WARNING_SESSION(server->config.logger, session,
  315. "Subscription %u | MonitoredItem %i | "
  316. "Detected change, but could not allocate memory for the notification"
  317. "with status %s", subscriptionId, mon->monitoredItemId,
  318. UA_StatusCode_name(retval));
  319. return false;
  320. }
  321. return true;
  322. }
  323. *encoding = valueEncoding;
  324. return true;
  325. }
  326. /* Has this sample changed from the last one? The method may allocate additional
  327. * space for the encoding buffer. Detect the change in encoding->data. */
  328. static UA_Boolean
  329. detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
  330. UA_DataValue value, UA_ByteString *encoding) {
  331. /* Apply Filter */
  332. if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
  333. value.hasValue = false;
  334. value.hasServerTimestamp = false;
  335. value.hasServerPicoseconds = false;
  336. if(mon->filter.dataChangeFilter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
  337. value.hasSourceTimestamp = false;
  338. value.hasSourcePicoseconds = false;
  339. }
  340. /* Detect the value change */
  341. return detectValueChangeWithFilter(server, mon, &value, encoding);
  342. }
  343. /* Returns whether the sample was stored in the MonitoredItem */
  344. static UA_Boolean
  345. sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
  346. UA_DataValue *value) {
  347. UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
  348. UA_Subscription *sub = monitoredItem->subscription;
  349. /* Contains heap-allocated binary encoding of the value if a change was detected */
  350. UA_ByteString binaryEncoding = UA_BYTESTRING_NULL;
  351. /* Has the value changed? Allocates memory in binaryEncoding if necessary.
  352. * value is edited internally so we make a shallow copy. */
  353. UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binaryEncoding);
  354. if(!changed)
  355. return false;
  356. UA_Boolean storedValue = false;
  357. if(sub) {
  358. /* Allocate a new notification */
  359. UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
  360. if(!newNotification) {
  361. UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
  362. "Subscription %u | MonitoredItem %i | "
  363. "Item for the publishing queue could not be allocated",
  364. sub->subscriptionId, monitoredItem->monitoredItemId);
  365. UA_ByteString_deleteMembers(&binaryEncoding);
  366. return false;
  367. }
  368. /* <-- Point of no return --> */
  369. newNotification->mon = monitoredItem;
  370. newNotification->data.value = *value; /* Move the value to the notification */
  371. storedValue = true;
  372. /* Enqueue the new notification */
  373. UA_Notification_enqueue(server, sub, monitoredItem, newNotification);
  374. } else {
  375. /* Call the local callback if not attached to a subscription */
  376. UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
  377. void *nodeContext = NULL;
  378. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
  379. localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
  380. localMon->context,
  381. &monitoredItem->monitoredNodeId,
  382. nodeContext, monitoredItem->attributeId,
  383. value);
  384. }
  385. /* Store the encoding for comparison */
  386. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  387. monitoredItem->lastSampledValue = binaryEncoding;
  388. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  389. UA_Variant_copy(&value->value, &monitoredItem->lastValue);
  390. return storedValue;
  391. }
  392. void
  393. UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  394. UA_Session *session = &adminSession;
  395. UA_UInt32 subscriptionId = 0;
  396. UA_Subscription *sub = monitoredItem->subscription;
  397. if(sub) {
  398. session = sub->session;
  399. subscriptionId = sub->subscriptionId;
  400. }
  401. if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  402. UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
  403. "MonitoredItem %i | Not a data change notification",
  404. subscriptionId, monitoredItem->monitoredItemId);
  405. return;
  406. }
  407. /* Sample the value */
  408. UA_ReadValueId rvid;
  409. UA_ReadValueId_init(&rvid);
  410. rvid.nodeId = monitoredItem->monitoredNodeId;
  411. rvid.attributeId = monitoredItem->attributeId;
  412. rvid.indexRange = monitoredItem->indexRange;
  413. UA_DataValue value = UA_Server_readWithSession(server, session, &rvid, monitoredItem->timestampsToReturn);
  414. /* Operate on the sample */
  415. UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
  416. /* Delete the sample if it was not stored in the MonitoredItem */
  417. if(!storedValue)
  418. UA_DataValue_deleteMembers(&value);
  419. }
  420. UA_StatusCode
  421. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  422. if(mon->sampleCallbackIsRegistered)
  423. return UA_STATUSCODE_GOOD;
  424. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  425. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  426. return UA_STATUSCODE_GOOD;
  427. UA_StatusCode retval =
  428. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  429. mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
  430. if(retval == UA_STATUSCODE_GOOD)
  431. mon->sampleCallbackIsRegistered = true;
  432. return retval;
  433. }
  434. UA_StatusCode
  435. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  436. if(!mon->sampleCallbackIsRegistered)
  437. return UA_STATUSCODE_GOOD;
  438. mon->sampleCallbackIsRegistered = false;
  439. return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  440. }
  441. #endif /* UA_ENABLE_SUBSCRIPTIONS */