ua_subscription_datachange.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. */
  10. #include "ua_server_internal.h"
  11. #include "ua_subscription.h"
  12. #include "ua_types_encoding_binary.h"
  13. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  14. #define UA_VALUENCODING_MAXSTACK 512
  15. void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
  16. memset(mon, 0, sizeof(UA_MonitoredItem));
  17. mon->subscription = sub;
  18. TAILQ_INIT(&mon->queue);
  19. }
  20. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  21. static UA_StatusCode
  22. removeMonitoredItemFromNodeCallback(UA_Server *server, UA_Session *session,
  23. UA_Node *node, void *data) {
  24. /* data is the monitoredItemID */
  25. /* catch edge case that it's the first element */
  26. if (data == ((UA_ObjectNode *) node)->monitoredItemQueue) {
  27. ((UA_ObjectNode *)node)->monitoredItemQueue = ((UA_MonitoredItem *)data)->next;
  28. return UA_STATUSCODE_GOOD;
  29. }
  30. /* SLIST_FOREACH */
  31. for (UA_MonitoredItem *entry = ((UA_ObjectNode *) node)->monitoredItemQueue->next;
  32. entry != NULL; entry=entry->next) {
  33. if (entry == (UA_MonitoredItem *)data) {
  34. /* SLIST_REMOVE */
  35. UA_MonitoredItem *iter = ((UA_ObjectNode *) node)->monitoredItemQueue;
  36. for (; iter->next != entry; iter=iter->next) {}
  37. iter->next = entry->next;
  38. UA_free(entry);
  39. break;
  40. }
  41. }
  42. return UA_STATUSCODE_GOOD;
  43. }
  44. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  45. void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  46. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  47. /* Remove the sampling callback */
  48. UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
  49. } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  50. /* TODO: Access val data.event */
  51. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  52. "MonitoredItemTypes other than ChangeNotify or EventNotify are not supported yet");
  53. }
  54. /* Remove the queued notifications if attached to a subscription */
  55. if(monitoredItem->subscription) {
  56. UA_Subscription *sub = monitoredItem->subscription;
  57. UA_Notification *notification, *notification_tmp;
  58. TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
  59. listEntry, notification_tmp) {
  60. /* Remove the item from the queues and free the memory */
  61. UA_Notification_delete(sub, monitoredItem, notification);
  62. }
  63. }
  64. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  65. if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
  66. /* Remove the monitored item from the node queue */
  67. UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
  68. removeMonitoredItemFromNodeCallback, monitoredItem);
  69. /* Delete the event filter */
  70. UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
  71. }
  72. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  73. /* Remove the monitored item */
  74. UA_String_deleteMembers(&monitoredItem->indexRange);
  75. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  76. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  77. UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
  78. UA_Server_delayedFree(server, monitoredItem);
  79. }
  80. UA_StatusCode
  81. MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
  82. if(mon->queueSize <= mon->maxQueueSize)
  83. return UA_STATUSCODE_GOOD;
  84. /* Remove notifications until the queue size is reached */
  85. UA_Subscription *sub = mon->subscription;
  86. while(mon->queueSize > mon->maxQueueSize) {
  87. UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
  88. /* Make sure that the MonitoredItem does not lose its place in the
  89. * global queue when notifications are removed. Otherwise the
  90. * MonitoredItem can "starve" itself by putting new notifications always
  91. * at the end of the global queue and removing the old ones.
  92. *
  93. * - If the oldest notification is removed, put the second oldest
  94. * notification right behind it.
  95. * - If the newest notification is removed, put the new notification
  96. * right behind it. */
  97. UA_Notification *del; /* The notification that will be deleted */
  98. UA_Notification *after_del; /* The notification to keep and move after del */
  99. if(mon->discardOldest) {
  100. /* Remove the oldest */
  101. del = TAILQ_FIRST(&mon->queue);
  102. after_del = TAILQ_NEXT(del, listEntry);
  103. } else {
  104. /* Remove the second newest (to keep the up-to-date notification) */
  105. after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
  106. del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
  107. }
  108. /* Move after_del right after del in the global queue */
  109. TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
  110. TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
  111. #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
  112. /* Create an overflow notification */
  113. /* if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) { */
  114. /* /\* EventFilterResult currently isn't being used */
  115. /* UA_EventFilterResult_deleteMembers(&del->data.event->result); *\/ */
  116. /* UA_EventFieldList_deleteMembers(&del->data.event.fields); */
  117. /* /\* cause an overflowEvent *\/ */
  118. /* /\* an overflowEvent does not care about event filters and as such */
  119. /* * will not be "triggered" correctly. Instead, a notification will */
  120. /* * be inserted into the queue which includes only the nodeId of the */
  121. /* * overflowEventType. It is up to the client to check for possible */
  122. /* * overflows. *\/ */
  123. /* UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification)); */
  124. /* if(!overflowNotification) */
  125. /* return UA_STATUSCODE_BADOUTOFMEMORY; */
  126. /* UA_EventFieldList_init(&overflowNotification->data.event.fields); */
  127. /* overflowNotification->data.event.fields.eventFields = UA_Variant_new(); */
  128. /* if(!overflowNotification->data.event.fields.eventFields) { */
  129. /* UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields); */
  130. /* UA_free(overflowNotification); */
  131. /* return UA_STATUSCODE_BADOUTOFMEMORY; */
  132. /* } */
  133. /* UA_Variant_init(overflowNotification->data.event.fields.eventFields); */
  134. /* overflowNotification->data.event.fields.eventFieldsSize = 1; */
  135. /* UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
  136. /* UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields, */
  137. /* &overflowId, &UA_TYPES[UA_TYPES_NODEID]); */
  138. /* overflowNotification->mon = mon; */
  139. /* if(mon->discardOldest) { */
  140. /* TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry); */
  141. /* TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
  142. /* } else { */
  143. /* TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry); */
  144. /* TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
  145. /* } */
  146. /* ++mon->queueSize; */
  147. /* ++sub->notificationQueueSize; */
  148. /* ++sub->eventNotifications; */
  149. /* } */
  150. #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
  151. /* Delete the notification. This also removes the notification from the
  152. * linked lists. */
  153. UA_Notification_delete(sub, mon, del);
  154. }
  155. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  156. /* Get the element that carries the infobits */
  157. UA_Notification *notification = NULL;
  158. if(mon->discardOldest)
  159. notification = TAILQ_FIRST(&mon->queue);
  160. else
  161. notification = TAILQ_LAST(&mon->queue, NotificationQueue);
  162. UA_assert(notification);
  163. if(mon->maxQueueSize > 1) {
  164. /* Add the infobits either to the newest or the new last entry */
  165. notification->data.value.hasStatus = true;
  166. notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
  167. UA_STATUSCODE_INFOBITS_OVERFLOW);
  168. } else {
  169. /* If the queue size is reduced to one, remove the infobits */
  170. notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
  171. UA_STATUSCODE_INFOBITS_OVERFLOW);
  172. }
  173. }
  174. /* TODO: Infobits for Events? */
  175. return UA_STATUSCODE_GOOD;
  176. }
  177. #define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
  178. static UA_INLINE UA_Boolean
  179. outOfDeadBand(const void *data1, const void *data2, const size_t index,
  180. const UA_DataType *type, const UA_Double deadbandValue) {
  181. if(type == &UA_TYPES[UA_TYPES_SBYTE]) {
  182. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_SByte*)data1)[index],
  183. ((const UA_SByte*)data2)[index]) <= deadbandValue)
  184. return false;
  185. } else if(type == &UA_TYPES[UA_TYPES_BYTE]) {
  186. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Byte*)data1)[index],
  187. ((const UA_Byte*)data2)[index]) <= deadbandValue)
  188. return false;
  189. } else if(type == &UA_TYPES[UA_TYPES_INT16]) {
  190. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int16*)data1)[index],
  191. ((const UA_Int16*)data2)[index]) <= deadbandValue)
  192. return false;
  193. } else if(type == &UA_TYPES[UA_TYPES_UINT16]) {
  194. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt16*)data1)[index],
  195. ((const UA_UInt16*)data2)[index]) <= deadbandValue)
  196. return false;
  197. } else if(type == &UA_TYPES[UA_TYPES_INT32]) {
  198. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int32*)data1)[index],
  199. ((const UA_Int32*)data2)[index]) <= deadbandValue)
  200. return false;
  201. } else if(type == &UA_TYPES[UA_TYPES_UINT32]) {
  202. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt32*)data1)[index],
  203. ((const UA_UInt32*)data2)[index]) <= deadbandValue)
  204. return false;
  205. } else if(type == &UA_TYPES[UA_TYPES_INT64]) {
  206. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int64*)data1)[index],
  207. ((const UA_Int64*)data2)[index]) <= deadbandValue)
  208. return false;
  209. } else if(type == &UA_TYPES[UA_TYPES_UINT64]) {
  210. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt64*)data1)[index],
  211. ((const UA_UInt64*)data2)[index]) <= deadbandValue)
  212. return false;
  213. } else if(type == &UA_TYPES[UA_TYPES_FLOAT]) {
  214. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Float*)data1)[index],
  215. ((const UA_Float*)data2)[index]) <= deadbandValue)
  216. return false;
  217. } else if(type == &UA_TYPES[UA_TYPES_DOUBLE]) {
  218. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Double*)data1)[index],
  219. ((const UA_Double*)data2)[index]) <= deadbandValue)
  220. return false;
  221. }
  222. return true;
  223. }
  224. static UA_INLINE UA_Boolean
  225. updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue,
  226. const UA_Double deadbandValue) {
  227. if(value->arrayLength != oldValue->arrayLength)
  228. return true;
  229. if(value->type != oldValue->type)
  230. return true;
  231. if (UA_Variant_isScalar(value)) {
  232. return outOfDeadBand(value->data, oldValue->data, 0, value->type, deadbandValue);
  233. } else {
  234. for (size_t i = 0; i < value->arrayLength; ++i) {
  235. if (outOfDeadBand(value->data, oldValue->data, i, value->type, deadbandValue))
  236. return true;
  237. }
  238. }
  239. return false;
  240. }
  241. /* When a change is detected, encoding contains the heap-allocated binary encoded value */
  242. static UA_Boolean
  243. detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
  244. UA_ByteString *encoding) {
  245. UA_Session *session = &adminSession;
  246. UA_UInt32 subscriptionId = 0;
  247. UA_Subscription *sub = mon->subscription;
  248. if(sub) {
  249. session = sub->session;
  250. subscriptionId = sub->subscriptionId;
  251. }
  252. if(isDataTypeNumeric(value->value.type) &&
  253. (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
  254. mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
  255. if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
  256. if(!updateNeededForFilteredValue(&value->value, &mon->lastValue,
  257. mon->filter.dataChangeFilter.deadbandValue))
  258. return false;
  259. }
  260. /* else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
  261. // TODO where do this EURange come from ?
  262. UA_Double deadbandValue = fabs(mon->filter.deadbandValue * (EURange.high-EURange.low));
  263. if (!updateNeededForFilteredValue(value->value, mon->lastValue, deadbandValue))
  264. return false;
  265. }*/
  266. }
  267. /* Stack-allocate some memory for the value encoding. We might heap-allocate
  268. * more memory if needed. This is just enough for scalars and small
  269. * structures. */
  270. UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
  271. UA_ByteString valueEncoding;
  272. valueEncoding.data = stackValueEncoding;
  273. valueEncoding.length = UA_VALUENCODING_MAXSTACK;
  274. /* Encode the value */
  275. UA_Byte *bufPos = valueEncoding.data;
  276. const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
  277. UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  278. &bufPos, &bufEnd, NULL, NULL);
  279. if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED) {
  280. size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
  281. if(binsize == 0)
  282. return false;
  283. retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
  284. if(retval == UA_STATUSCODE_GOOD) {
  285. bufPos = valueEncoding.data;
  286. bufEnd = &valueEncoding.data[valueEncoding.length];
  287. retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  288. &bufPos, &bufEnd, NULL, NULL);
  289. }
  290. }
  291. if(retval != UA_STATUSCODE_GOOD) {
  292. UA_LOG_WARNING_SESSION(server->config.logger, session,
  293. "Subscription %u | MonitoredItem %i | "
  294. "Could not encode the value the MonitoredItem with status %s",
  295. subscriptionId, mon->monitoredItemId, UA_StatusCode_name(retval));
  296. return false;
  297. }
  298. /* Has the value changed? */
  299. valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
  300. UA_Boolean changed = (!mon->lastSampledValue.data ||
  301. !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
  302. /* No change */
  303. if(!changed) {
  304. if(valueEncoding.data != stackValueEncoding)
  305. UA_ByteString_deleteMembers(&valueEncoding);
  306. return false;
  307. }
  308. /* Change detected. Copy encoding on the heap if necessary. */
  309. if(valueEncoding.data == stackValueEncoding) {
  310. retval = UA_ByteString_copy(&valueEncoding, encoding);
  311. if(retval != UA_STATUSCODE_GOOD) {
  312. UA_LOG_WARNING_SESSION(server->config.logger, session,
  313. "Subscription %u | MonitoredItem %i | "
  314. "Detected change, but could not allocate memory for the notification"
  315. "with status %s", subscriptionId, mon->monitoredItemId,
  316. UA_StatusCode_name(retval));
  317. return false;
  318. }
  319. return true;
  320. }
  321. *encoding = valueEncoding;
  322. return true;
  323. }
  324. /* Has this sample changed from the last one? The method may allocate additional
  325. * space for the encoding buffer. Detect the change in encoding->data. */
  326. static UA_Boolean
  327. detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
  328. UA_DataValue value, UA_ByteString *encoding) {
  329. /* Apply Filter */
  330. if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
  331. value.hasValue = false;
  332. value.hasServerTimestamp = false;
  333. value.hasServerPicoseconds = false;
  334. if(mon->filter.dataChangeFilter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
  335. value.hasSourceTimestamp = false;
  336. value.hasSourcePicoseconds = false;
  337. }
  338. /* Detect the value change */
  339. return detectValueChangeWithFilter(server, mon, &value, encoding);
  340. }
  341. /* Returns whether the sample was stored in the MonitoredItem */
  342. static UA_Boolean
  343. sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
  344. UA_DataValue *value) {
  345. UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
  346. UA_Subscription *sub = monitoredItem->subscription;
  347. /* Contains heap-allocated binary encoding of the value if a change was detected */
  348. UA_ByteString binaryEncoding = UA_BYTESTRING_NULL;
  349. /* Has the value changed? Allocates memory in binaryEncoding if necessary.
  350. * value is edited internally so we make a shallow copy. */
  351. UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binaryEncoding);
  352. if(!changed)
  353. return false;
  354. UA_Boolean storedValue = false;
  355. if(sub) {
  356. /* Allocate a new notification */
  357. UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
  358. if(!newNotification) {
  359. UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
  360. "Subscription %u | MonitoredItem %i | "
  361. "Item for the publishing queue could not be allocated",
  362. sub->subscriptionId, monitoredItem->monitoredItemId);
  363. UA_ByteString_deleteMembers(&binaryEncoding);
  364. return false;
  365. }
  366. /* <-- Point of no return --> */
  367. newNotification->mon = monitoredItem;
  368. newNotification->data.value = *value; /* Move the value to the notification */
  369. storedValue = true;
  370. /* Enqueue the new notification */
  371. UA_Notification_enqueue(server, sub, monitoredItem, newNotification);
  372. } else {
  373. /* Call the local callback if not attached to a subscription */
  374. UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
  375. void *nodeContext = NULL;
  376. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
  377. localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
  378. localMon->context,
  379. &monitoredItem->monitoredNodeId,
  380. nodeContext, monitoredItem->attributeId,
  381. value);
  382. }
  383. /* Store the encoding for comparison */
  384. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  385. monitoredItem->lastSampledValue = binaryEncoding;
  386. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  387. UA_Variant_copy(&value->value, &monitoredItem->lastValue);
  388. return storedValue;
  389. }
  390. void
  391. UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  392. UA_Session *session = &adminSession;
  393. UA_UInt32 subscriptionId = 0;
  394. UA_Subscription *sub = monitoredItem->subscription;
  395. if(sub) {
  396. session = sub->session;
  397. subscriptionId = sub->subscriptionId;
  398. }
  399. if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  400. UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
  401. "MonitoredItem %i | Not a data change notification",
  402. subscriptionId, monitoredItem->monitoredItemId);
  403. return;
  404. }
  405. /* Sample the value */
  406. UA_ReadValueId rvid;
  407. UA_ReadValueId_init(&rvid);
  408. rvid.nodeId = monitoredItem->monitoredNodeId;
  409. rvid.attributeId = monitoredItem->attributeId;
  410. rvid.indexRange = monitoredItem->indexRange;
  411. UA_DataValue value = UA_Server_readWithSession(server, session, &rvid, monitoredItem->timestampsToReturn);
  412. /* Operate on the sample */
  413. UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
  414. /* Delete the sample if it was not stored in the MonitoredItem */
  415. if(!storedValue)
  416. UA_DataValue_deleteMembers(&value);
  417. }
  418. UA_StatusCode
  419. UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  420. if(mon->sampleCallbackIsRegistered)
  421. return UA_STATUSCODE_GOOD;
  422. /* Only DataChange MonitoredItems have a callback with a sampling interval */
  423. if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
  424. return UA_STATUSCODE_GOOD;
  425. UA_StatusCode retval =
  426. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
  427. mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
  428. if(retval == UA_STATUSCODE_GOOD)
  429. mon->sampleCallbackIsRegistered = true;
  430. return retval;
  431. }
  432. UA_StatusCode
  433. UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
  434. if(!mon->sampleCallbackIsRegistered)
  435. return UA_STATUSCODE_GOOD;
  436. mon->sampleCallbackIsRegistered = false;
  437. return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
  438. }
  439. #endif /* UA_ENABLE_SUBSCRIPTIONS */