ua_subscription_datachange.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  8. * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2018 (c) Fabian Arndt, Root-Core
  10. */
  11. #include "ua_server_internal.h"
  12. #include "ua_subscription.h"
  13. #include "ua_types_encoding_binary.h"
  14. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  15. #define UA_VALUENCODING_MAXSTACK 512
  16. #define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
  17. static UA_Boolean
  18. outOfDeadBand(const void *data1, const void *data2, const size_t arrayPos,
  19. const UA_DataType *type, const UA_Double deadbandValue) {
  20. if(type == &UA_TYPES[UA_TYPES_BOOLEAN]) {
  21. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Boolean*)data1)[arrayPos],
  22. ((const UA_Boolean*)data2)[arrayPos]) <= deadbandValue)
  23. return false;
  24. } else if(type == &UA_TYPES[UA_TYPES_SBYTE]) {
  25. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_SByte*)data1)[arrayPos],
  26. ((const UA_SByte*)data2)[arrayPos]) <= deadbandValue)
  27. return false;
  28. } else if(type == &UA_TYPES[UA_TYPES_BYTE]) {
  29. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Byte*)data1)[arrayPos],
  30. ((const UA_Byte*)data2)[arrayPos]) <= deadbandValue)
  31. return false;
  32. } else if(type == &UA_TYPES[UA_TYPES_INT16]) {
  33. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int16*)data1)[arrayPos],
  34. ((const UA_Int16*)data2)[arrayPos]) <= deadbandValue)
  35. return false;
  36. } else if(type == &UA_TYPES[UA_TYPES_UINT16]) {
  37. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt16*)data1)[arrayPos],
  38. ((const UA_UInt16*)data2)[arrayPos]) <= deadbandValue)
  39. return false;
  40. } else if(type == &UA_TYPES[UA_TYPES_INT32]) {
  41. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int32*)data1)[arrayPos],
  42. ((const UA_Int32*)data2)[arrayPos]) <= deadbandValue)
  43. return false;
  44. } else if(type == &UA_TYPES[UA_TYPES_UINT32]) {
  45. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt32*)data1)[arrayPos],
  46. ((const UA_UInt32*)data2)[arrayPos]) <= deadbandValue)
  47. return false;
  48. } else if(type == &UA_TYPES[UA_TYPES_INT64]) {
  49. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Int64*)data1)[arrayPos],
  50. ((const UA_Int64*)data2)[arrayPos]) <= deadbandValue)
  51. return false;
  52. } else if(type == &UA_TYPES[UA_TYPES_UINT64]) {
  53. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_UInt64*)data1)[arrayPos],
  54. ((const UA_UInt64*)data2)[arrayPos]) <= deadbandValue)
  55. return false;
  56. } else if(type == &UA_TYPES[UA_TYPES_FLOAT]) {
  57. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Float*)data1)[arrayPos],
  58. ((const UA_Float*)data2)[arrayPos]) <= deadbandValue)
  59. return false;
  60. } else if(type == &UA_TYPES[UA_TYPES_DOUBLE]) {
  61. if(ABS_SUBTRACT_TYPE_INDEPENDENT(((const UA_Double*)data1)[arrayPos],
  62. ((const UA_Double*)data2)[arrayPos]) <= deadbandValue)
  63. return false;
  64. }
  65. return true;
  66. }
  67. static UA_INLINE UA_Boolean
  68. updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue,
  69. const UA_Double deadbandValue) {
  70. if(value->arrayLength != oldValue->arrayLength)
  71. return true;
  72. if(value->type != oldValue->type)
  73. return true;
  74. if (UA_Variant_isScalar(value)) {
  75. return outOfDeadBand(value->data, oldValue->data, 0, value->type, deadbandValue);
  76. } else {
  77. for (size_t i = 0; i < value->arrayLength; ++i) {
  78. if (outOfDeadBand(value->data, oldValue->data, i, value->type, deadbandValue))
  79. return true;
  80. }
  81. }
  82. return false;
  83. }
  84. /* When a change is detected, encoding contains the heap-allocated binary encoded value */
  85. static UA_Boolean
  86. detectValueChangeWithFilter(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon,
  87. UA_DataValue *value, UA_ByteString *encoding) {
  88. if(UA_DataType_isNumeric(value->value.type) &&
  89. (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
  90. mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
  91. if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
  92. if(!updateNeededForFilteredValue(&value->value, &mon->lastValue,
  93. mon->filter.dataChangeFilter.deadbandValue))
  94. return false;
  95. }
  96. /* else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
  97. // TODO where do this EURange come from ?
  98. UA_Double deadbandValue = fabs(mon->filter.deadbandValue * (EURange.high-EURange.low));
  99. if (!updateNeededForFilteredValue(value->value, mon->lastValue, deadbandValue))
  100. return false;
  101. }*/
  102. }
  103. /* Stack-allocate some memory for the value encoding. We might heap-allocate
  104. * more memory if needed. This is just enough for scalars and small
  105. * structures. */
  106. UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
  107. UA_ByteString valueEncoding;
  108. valueEncoding.data = stackValueEncoding;
  109. valueEncoding.length = UA_VALUENCODING_MAXSTACK;
  110. /* Encode the value */
  111. UA_Byte *bufPos = valueEncoding.data;
  112. const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
  113. UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  114. &bufPos, &bufEnd, NULL, NULL);
  115. if(retval == UA_STATUSCODE_BADENCODINGERROR) {
  116. size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
  117. if(binsize == 0)
  118. return false;
  119. if(binsize > UA_VALUENCODING_MAXSTACK) {
  120. retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
  121. if(retval == UA_STATUSCODE_GOOD) {
  122. bufPos = valueEncoding.data;
  123. bufEnd = &valueEncoding.data[valueEncoding.length];
  124. retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
  125. &bufPos, &bufEnd, NULL, NULL);
  126. }
  127. }
  128. }
  129. if(retval != UA_STATUSCODE_GOOD) {
  130. UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
  131. "Subscription %u | MonitoredItem %i | "
  132. "Could not encode the value the MonitoredItem with status %s",
  133. sub ? sub->subscriptionId : 0, mon->monitoredItemId,
  134. UA_StatusCode_name(retval));
  135. return false;
  136. }
  137. /* Has the value changed? */
  138. valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
  139. UA_Boolean changed = (!mon->lastSampledValue.data ||
  140. !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
  141. /* No change */
  142. if(!changed) {
  143. if(valueEncoding.data != stackValueEncoding)
  144. UA_ByteString_deleteMembers(&valueEncoding);
  145. return false;
  146. }
  147. /* Change detected. Copy encoding on the heap if necessary. */
  148. if(valueEncoding.data == stackValueEncoding) {
  149. retval = UA_ByteString_copy(&valueEncoding, encoding);
  150. if(retval != UA_STATUSCODE_GOOD) {
  151. UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
  152. "Subscription %u | MonitoredItem %i | "
  153. "Detected change, but could not allocate memory for the notification"
  154. "with status %s", sub ? sub->subscriptionId : 0,
  155. mon->monitoredItemId, UA_StatusCode_name(retval));
  156. return false;
  157. }
  158. return true;
  159. }
  160. *encoding = valueEncoding;
  161. return true;
  162. }
  163. /* Has this sample changed from the last one? The method may allocate additional
  164. * space for the encoding buffer. Detect the change in encoding->data. */
  165. static UA_Boolean
  166. detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
  167. UA_DataValue value, UA_ByteString *encoding) {
  168. /* Apply Filter */
  169. if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
  170. value.hasValue = false;
  171. value.hasServerTimestamp = false;
  172. value.hasServerPicoseconds = false;
  173. if(mon->filter.dataChangeFilter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
  174. value.hasSourceTimestamp = false;
  175. value.hasSourcePicoseconds = false;
  176. }
  177. /* Detect the value change */
  178. return detectValueChangeWithFilter(server, mon->subscription, mon, &value, encoding);
  179. }
  180. /* Returns whether the sample was stored in the MonitoredItem */
  181. static UA_Boolean
  182. sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
  183. UA_DataValue *value) {
  184. UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
  185. UA_Subscription *sub = monitoredItem->subscription;
  186. /* Contains heap-allocated binary encoding of the value if a change was detected */
  187. UA_ByteString binValueEncoding = UA_BYTESTRING_NULL;
  188. /* Has the value changed? Allocates memory in binValueEncoding if necessary.
  189. * value is edited internally so we make a shallow copy. */
  190. UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binValueEncoding);
  191. if(!changed)
  192. return false;
  193. UA_Boolean storedValue = false;
  194. if(sub) {
  195. /* Allocate a new notification */
  196. UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
  197. if(!newNotification) {
  198. UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
  199. "Subscription %u | MonitoredItem %i | "
  200. "Item for the publishing queue could not be allocated",
  201. sub->subscriptionId, monitoredItem->monitoredItemId);
  202. UA_ByteString_deleteMembers(&binValueEncoding);
  203. return false;
  204. }
  205. if(value->value.storageType == UA_VARIANT_DATA) {
  206. newNotification->data.value = *value; /* Move the value to the notification */
  207. storedValue = true;
  208. } else { /* => (value->value.storageType == UA_VARIANT_DATA_NODELETE) */
  209. UA_StatusCode retval = UA_DataValue_copy(value, &newNotification->data.value);
  210. if(retval != UA_STATUSCODE_GOOD) {
  211. UA_ByteString_deleteMembers(&binValueEncoding);
  212. UA_free(newNotification);
  213. return false;
  214. }
  215. }
  216. /* <-- Point of no return --> */
  217. /* Enqueue the new notification */
  218. newNotification->mon = monitoredItem;
  219. UA_Notification_enqueue(server, sub, monitoredItem, newNotification);
  220. } else {
  221. /* Call the local callback if not attached to a subscription */
  222. UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
  223. void *nodeContext = NULL;
  224. UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
  225. localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
  226. localMon->context,
  227. &monitoredItem->monitoredNodeId,
  228. nodeContext, monitoredItem->attributeId,
  229. value);
  230. }
  231. // If someone called UA_Server_deleteMonitoredItem in the user callback,
  232. // then the monitored item will be deleted soon. So, there is no need to
  233. // add the lastValue or lastSampledValue to it.
  234. //
  235. // If we do so, we will leak
  236. // the memory of that values, because UA_Server_deleteMonitoredItem
  237. // already deleted all members and scheduled the monitored item pointer
  238. // for later delete. In the later delete the monitored item will be deleted
  239. // and not the members.
  240. //
  241. // Also in the later delete, all type information is lost and a deleteMember
  242. // is not possible.
  243. //
  244. // We do detect if the monitored item is already defunct.
  245. if (!monitoredItem->sampleCallbackIsRegistered) {
  246. UA_ByteString_deleteMembers(&binValueEncoding);
  247. return storedValue;
  248. }
  249. /* Store the encoding for comparison */
  250. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  251. monitoredItem->lastSampledValue = binValueEncoding;
  252. /* Store the value for filter comparison (we don't want to decode
  253. * lastSampledValue in every iteration) */
  254. if((monitoredItem->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT ||
  255. monitoredItem->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) &&
  256. (monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
  257. monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
  258. UA_Variant_deleteMembers(&monitoredItem->lastValue);
  259. UA_Variant_copy(&value->value, &monitoredItem->lastValue);
  260. /* Don't test the return code here. If this fails, lastValue is empty
  261. * and a notification will be forced for the next deadband comparison. */
  262. }
  263. return storedValue;
  264. }
  265. void
  266. UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  267. UA_Subscription *sub = monitoredItem->subscription;
  268. UA_Session *session = &server->adminSession;
  269. if(sub)
  270. session = sub->session;
  271. if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  272. UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
  273. "MonitoredItem %i | Not a data change notification",
  274. sub ? sub->subscriptionId : 0, monitoredItem->monitoredItemId);
  275. return;
  276. }
  277. /* Get the node */
  278. const UA_Node *node = UA_Nodestore_get(server, &monitoredItem->monitoredNodeId);
  279. /* Sample the value. The sample can still point into the node. */
  280. UA_DataValue value;
  281. UA_DataValue_init(&value);
  282. if(node) {
  283. UA_ReadValueId rvid;
  284. UA_ReadValueId_init(&rvid);
  285. rvid.nodeId = monitoredItem->monitoredNodeId;
  286. rvid.attributeId = monitoredItem->attributeId;
  287. rvid.indexRange = monitoredItem->indexRange;
  288. ReadWithNode(node, server, session, monitoredItem->timestampsToReturn, &rvid, &value);
  289. } else {
  290. value.hasStatus = true;
  291. value.status = UA_STATUSCODE_BADNODEIDUNKNOWN;
  292. }
  293. /* Operate on the sample */
  294. UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
  295. /* Delete the sample if it was not stored in the MonitoredItem */
  296. if(!storedValue)
  297. UA_DataValue_deleteMembers(&value); /* Does nothing for UA_VARIANT_DATA_NODELETE */
  298. if(node)
  299. UA_Nodestore_release(server, node);
  300. }
  301. #endif /* UA_ENABLE_SUBSCRIPTIONS */