ua_subscription.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. #include "ua_subscription.h"
  2. #include "ua_server_internal.h"
  3. #include "ua_nodestore.h"
  4. /****************/
  5. /* Subscription */
  6. /****************/
  7. UA_Subscription *UA_Subscription_new(UA_Int32 subscriptionID) {
  8. UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
  9. if(!new)
  10. return UA_NULL;
  11. new->subscriptionID = subscriptionID;
  12. new->lastPublished = 0;
  13. new->sequenceNumber = 1;
  14. memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
  15. new->timedUpdateJob = UA_NULL;
  16. new->timedUpdateIsRegistered = UA_FALSE;
  17. LIST_INIT(&new->MonitoredItems);
  18. LIST_INIT(&new->unpublishedNotifications);
  19. return new;
  20. }
  21. void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
  22. UA_MonitoredItem *mon, *tmp_mon;
  23. // Just in case any parallel process attempts to access this subscription
  24. // while we are deleting it... make it vanish.
  25. subscription->subscriptionID = 0;
  26. // Delete monitored Items
  27. LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
  28. LIST_REMOVE(mon, listEntry);
  29. MonitoredItem_delete(mon);
  30. }
  31. // Delete unpublished Notifications
  32. Subscription_deleteUnpublishedNotification(0, true, subscription);
  33. // Unhook/Unregister any timed work assiociated with this subscription
  34. if(subscription->timedUpdateJob != UA_NULL){
  35. Subscription_unregisterUpdateJob(server, subscription);
  36. UA_free(subscription->timedUpdateJob);
  37. }
  38. }
  39. UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
  40. if(!subscription)
  41. return 0;
  42. UA_UInt32 j = 0;
  43. UA_unpublishedNotification *i;
  44. LIST_FOREACH(i, &subscription->unpublishedNotifications, listEntry)
  45. j++;
  46. return j;
  47. }
  48. void Subscription_generateKeepAlive(UA_Subscription *subscription) {
  49. if(subscription->keepAliveCount.currentValue > subscription->keepAliveCount.minValue &&
  50. subscription->keepAliveCount.currentValue <= subscription->keepAliveCount.maxValue)
  51. return;
  52. UA_unpublishedNotification *msg = UA_malloc(sizeof(UA_unpublishedNotification));
  53. if(!msg)
  54. return;
  55. msg->notification = UA_NULL;
  56. msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
  57. msg->notification->notificationData = UA_NULL;
  58. // KeepAlive uses next message, but does not increment counter
  59. msg->notification->sequenceNumber = subscription->sequenceNumber + 1;
  60. msg->notification->publishTime = UA_DateTime_now();
  61. msg->notification->notificationDataSize = 0;
  62. LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
  63. subscription->keepAliveCount.currentValue = subscription->keepAliveCount.maxValue;
  64. }
  65. void Subscription_updateNotifications(UA_Subscription *subscription) {
  66. UA_MonitoredItem *mon;
  67. //MonitoredItem_queuedValue *queuedValue;
  68. UA_unpublishedNotification *msg;
  69. UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
  70. UA_DataChangeNotification *changeNotification;
  71. size_t notificationOffset;
  72. if(!subscription || subscription->lastPublished + subscription->publishingInterval > UA_DateTime_now())
  73. return;
  74. // Make sure there is data to be published and establish which message types
  75. // will need to be generated
  76. LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
  77. // Check if this MonitoredItems Queue holds data and how much data is held in total
  78. if(!TAILQ_FIRST(&mon->queue))
  79. continue;
  80. if((mon->monitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
  81. monItemsChangeT+=mon->queueSize.currentValue;
  82. else if((mon->monitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
  83. monItemsStatusT+=mon->queueSize.currentValue;
  84. else if((mon->monitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY) != 0)
  85. monItemsEventT+=mon->queueSize.currentValue;
  86. }
  87. // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
  88. if(Subscription_queuedNotifications(subscription) >= 10) {
  89. // Remove last entry
  90. Subscription_deleteUnpublishedNotification(0, true, subscription);
  91. }
  92. if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
  93. // Decrement KeepAlive
  94. subscription->keepAliveCount.currentValue--;
  95. // +- Generate KeepAlive msg if counter overruns
  96. if (subscription->keepAliveCount.currentValue < subscription->keepAliveCount.minValue)
  97. Subscription_generateKeepAlive(subscription);
  98. return;
  99. }
  100. msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
  101. msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
  102. INITPOINTER(msg->notification->notificationData);
  103. msg->notification->sequenceNumber = subscription->sequenceNumber++;
  104. msg->notification->publishTime = UA_DateTime_now();
  105. // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
  106. // list of Queued values from all monitoredItems of that type
  107. msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);
  108. // + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
  109. msg->notification->notificationData = UA_Array_new(&UA_TYPES[UA_TYPES_EXTENSIONOBJECT],
  110. msg->notification->notificationDataSize);
  111. for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
  112. // Set the notification message type and encoding for each of
  113. // the three possible NotificationData Types
  114. msg->notification->notificationData[notmsgn].encoding = 1; // Encoding is always binary
  115. msg->notification->notificationData[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
  116. if(notmsgn == 0) {
  117. // Construct a DataChangeNotification
  118. changeNotification = UA_malloc(sizeof(UA_DataChangeNotification));
  119. // Create one DataChangeNotification for each queue item held in each monitoredItems queue:
  120. changeNotification->monitoredItems = UA_Array_new(&UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION],
  121. monItemsChangeT);
  122. // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
  123. // the propper NotificationMessageType (Status, Change, Event)
  124. monItemsChangeT = 0;
  125. LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
  126. if(mon->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
  127. continue;
  128. // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
  129. monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
  130. MonitoredItem_ClearQueue(mon);
  131. }
  132. changeNotification->monitoredItemsSize = monItemsChangeT;
  133. changeNotification->diagnosticInfosSize = 0;
  134. changeNotification->diagnosticInfos = UA_NULL;
  135. msg->notification->notificationData[notmsgn].body.length =
  136. UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
  137. msg->notification->notificationData[notmsgn].body.data =
  138. UA_calloc(msg->notification->notificationData[notmsgn].body.length, sizeof(UA_Byte));
  139. notificationOffset = 0;
  140. UA_encodeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION],
  141. &msg->notification->notificationData[notmsgn].body, &notificationOffset);
  142. // FIXME: Not properly freed!
  143. for(unsigned int i=0; i<monItemsChangeT; i++) {
  144. UA_MonitoredItemNotification *thisNotification = &(changeNotification->monitoredItems[i]);
  145. UA_DataValue_deleteMembers(&(thisNotification->value));
  146. }
  147. UA_free(changeNotification->monitoredItems);
  148. UA_free(changeNotification);
  149. } else if(notmsgn == 1) {
  150. // FIXME: Constructing a StatusChangeNotification is not implemented
  151. } else if(notmsgn == 2) {
  152. // FIXME: Constructing a EventListNotification is not implemented
  153. }
  154. }
  155. LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
  156. }
  157. UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
  158. UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
  159. if(!seqArray)
  160. return UA_NULL;
  161. int i = 0;
  162. UA_unpublishedNotification *not;
  163. LIST_FOREACH(not, &sub->unpublishedNotifications, listEntry) {
  164. seqArray[i] = not->notification->sequenceNumber;
  165. i++;
  166. }
  167. return seqArray;
  168. }
  169. void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
  170. if(!dst)
  171. return;
  172. if(Subscription_queuedNotifications(sub) == 0) {
  173. dst->notificationDataSize = 0;
  174. dst->publishTime = UA_DateTime_now();
  175. dst->sequenceNumber = 0;
  176. return;
  177. }
  178. UA_NotificationMessage *latest = LIST_FIRST(&sub->unpublishedNotifications)->notification;
  179. dst->notificationDataSize = latest->notificationDataSize;
  180. dst->publishTime = latest->publishTime;
  181. dst->sequenceNumber = latest->sequenceNumber;
  182. if(latest->notificationDataSize == 0)
  183. return;
  184. dst->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject));
  185. dst->notificationData->encoding = latest->notificationData->encoding;
  186. dst->notificationData->typeId = latest->notificationData->typeId;
  187. UA_ByteString_copy(&latest->notificationData->body,
  188. &dst->notificationData->body);
  189. }
  190. UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub) {
  191. UA_UInt32 deletedItems = 0;
  192. UA_unpublishedNotification *not, *tmp;
  193. LIST_FOREACH_SAFE(not, &sub->unpublishedNotifications, listEntry, tmp) {
  194. if (!bDeleteAll && not->notification->sequenceNumber != seqNo)
  195. continue;
  196. LIST_REMOVE(not, listEntry);
  197. if(not->notification) {
  198. if(not->notification->notificationData) {
  199. if(not->notification->notificationData->body.data)
  200. UA_free(not->notification->notificationData->body.data);
  201. UA_free(not->notification->notificationData);
  202. }
  203. UA_free(not->notification);
  204. }
  205. UA_free(not);
  206. deletedItems++;
  207. }
  208. return deletedItems;
  209. }
  210. static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
  211. // Timed-Worker/Job Version of updateNotifications
  212. UA_Subscription *sub = (UA_Subscription *) data;
  213. UA_MonitoredItem *mon;
  214. if(!data || !server)
  215. return;
  216. // This is set by the Subscription_delete function to detere us from fiddling with
  217. // this subscription if it is being deleted (not technically thread save, but better
  218. // then nothing at all)
  219. if(sub->subscriptionID == 0)
  220. return;
  221. // FIXME: This should be done by the event system
  222. LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
  223. MonitoredItem_QueuePushDataValue(server, mon);
  224. Subscription_updateNotifications(sub);
  225. }
  226. UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
  227. if(server == UA_NULL || sub == UA_NULL)
  228. return UA_STATUSCODE_BADSERVERINDEXINVALID;
  229. UA_Job *theWork;
  230. theWork = (UA_Job *) UA_malloc(sizeof(UA_Job));
  231. if(!theWork)
  232. return UA_STATUSCODE_BADOUTOFMEMORY;
  233. *theWork = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
  234. .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = sub} };
  235. sub->timedUpdateJobGuid = jobId;
  236. sub->timedUpdateJob = theWork;
  237. return UA_STATUSCODE_GOOD;
  238. }
  239. UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
  240. if(server == UA_NULL || sub == UA_NULL)
  241. return UA_STATUSCODE_BADSERVERINDEXINVALID;
  242. if(sub->publishingInterval <= 5 )
  243. return UA_STATUSCODE_BADNOTSUPPORTED;
  244. // Practically enough, the client sends a uint32 in ms, which we store as datetime, which here is required in as uint32 in ms as the interval
  245. UA_StatusCode retval = UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->publishingInterval,
  246. &sub->timedUpdateJobGuid);
  247. if(!retval)
  248. sub->timedUpdateIsRegistered = UA_TRUE;
  249. return retval;
  250. }
  251. UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
  252. if(server == UA_NULL || sub == UA_NULL)
  253. return UA_STATUSCODE_BADSERVERINDEXINVALID;
  254. UA_Int32 retval = UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
  255. sub->timedUpdateIsRegistered = UA_FALSE;
  256. return retval;
  257. }
  258. /*****************/
  259. /* MonitoredItem */
  260. /*****************/
  261. UA_MonitoredItem *UA_MonitoredItem_new() {
  262. UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
  263. new->queueSize = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
  264. new->lastSampled = 0;
  265. // FIXME: This is currently hardcoded;
  266. new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
  267. TAILQ_INIT(&new->queue);
  268. UA_NodeId_init(&new->monitoredNodeId);
  269. INITPOINTER(new->lastSampledValue.data );
  270. return new;
  271. }
  272. void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
  273. // Delete Queued Data
  274. MonitoredItem_ClearQueue(monitoredItem);
  275. // Remove from subscription list
  276. LIST_REMOVE(monitoredItem, listEntry);
  277. // Release comparison sample
  278. if(monitoredItem->lastSampledValue.data != NULL) {
  279. UA_free(monitoredItem->lastSampledValue.data);
  280. }
  281. UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
  282. UA_free(monitoredItem);
  283. }
  284. int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
  285. UA_MonitoredItem *monitoredItem) {
  286. int queueSize = 0;
  287. MonitoredItem_queuedValue *queueItem;
  288. // Count instead of relying on the items currentValue
  289. TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
  290. dst[queueSize].clientHandle = monitoredItem->clientHandle;
  291. dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
  292. dst[queueSize].value.hasServerTimestamp = UA_TRUE;
  293. dst[queueSize].value.serverTimestamp = UA_DateTime_now();
  294. dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
  295. dst[queueSize].value.hasSourceTimestamp = UA_TRUE;
  296. dst[queueSize].value.sourceTimestamp = UA_DateTime_now();
  297. dst[queueSize].value.hasValue = UA_TRUE;
  298. dst[queueSize].value.status = UA_STATUSCODE_GOOD;
  299. UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
  300. // Do not create variants with no type -> will make calcSizeBinary() segfault.
  301. if(dst[queueSize].value.value.type)
  302. queueSize++;
  303. }
  304. return queueSize;
  305. }
  306. void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
  307. MonitoredItem_queuedValue *val, *val_tmp;
  308. TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
  309. TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
  310. UA_Variant_deleteMembers(&val->value);
  311. UA_free(val);
  312. }
  313. monitoredItem->queueSize.currentValue = 0;
  314. }
  315. UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
  316. UA_Variant *dst) {
  317. UA_Boolean samplingError = UA_TRUE;
  318. UA_DataValue sourceDataValue;
  319. UA_DataValue_init(&sourceDataValue);
  320. const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode*)src;
  321. // FIXME: Not all attributeIDs can be monitored yet
  322. switch(attributeID) {
  323. case UA_ATTRIBUTEID_NODEID:
  324. UA_Variant_setScalarCopy(dst, (const UA_NodeId*)&src->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
  325. samplingError = UA_FALSE;
  326. break;
  327. case UA_ATTRIBUTEID_NODECLASS:
  328. UA_Variant_setScalarCopy(dst, (const UA_Int32*)&src->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
  329. samplingError = UA_FALSE;
  330. break;
  331. case UA_ATTRIBUTEID_BROWSENAME:
  332. UA_Variant_setScalarCopy(dst, (const UA_String*)&src->browseName, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
  333. samplingError = UA_FALSE;
  334. break;
  335. case UA_ATTRIBUTEID_DISPLAYNAME:
  336. UA_Variant_setScalarCopy(dst, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
  337. samplingError = UA_FALSE;
  338. break;
  339. case UA_ATTRIBUTEID_DESCRIPTION:
  340. UA_Variant_setScalarCopy(dst, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
  341. samplingError = UA_FALSE;
  342. break;
  343. case UA_ATTRIBUTEID_WRITEMASK:
  344. UA_Variant_setScalarCopy(dst, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
  345. samplingError = UA_FALSE;
  346. break;
  347. case UA_ATTRIBUTEID_USERWRITEMASK:
  348. UA_Variant_setScalarCopy(dst, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
  349. samplingError = UA_FALSE;
  350. break;
  351. case UA_ATTRIBUTEID_ISABSTRACT:
  352. break;
  353. case UA_ATTRIBUTEID_SYMMETRIC:
  354. break;
  355. case UA_ATTRIBUTEID_INVERSENAME:
  356. break;
  357. case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
  358. break;
  359. case UA_ATTRIBUTEID_EVENTNOTIFIER:
  360. break;
  361. case UA_ATTRIBUTEID_VALUE:
  362. if(src->nodeClass == UA_NODECLASS_VARIABLE) {
  363. const UA_VariableNode *vsrc = (const UA_VariableNode*)src;
  364. if(srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
  365. UA_Variant_copy(&vsrc->value.variant.value, dst);
  366. //no onRead callback here since triggered by a subscription
  367. samplingError = UA_FALSE;
  368. } else {
  369. if(srcAsVariableNode->valueSource != UA_VALUESOURCE_DATASOURCE)
  370. break;
  371. // todo: handle numeric ranges
  372. if(srcAsVariableNode->value.dataSource.read(vsrc->value.dataSource.handle, vsrc->nodeId, UA_TRUE, UA_NULL,
  373. &sourceDataValue) != UA_STATUSCODE_GOOD)
  374. break;
  375. UA_Variant_copy(&sourceDataValue.value, dst);
  376. if(sourceDataValue.value.data) {
  377. UA_deleteMembers(sourceDataValue.value.data, sourceDataValue.value.type);
  378. UA_free(sourceDataValue.value.data);
  379. sourceDataValue.value.data = UA_NULL;
  380. }
  381. UA_DataValue_deleteMembers(&sourceDataValue);
  382. samplingError = UA_FALSE;
  383. }
  384. }
  385. break;
  386. case UA_ATTRIBUTEID_DATATYPE:
  387. break;
  388. case UA_ATTRIBUTEID_VALUERANK:
  389. break;
  390. case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
  391. break;
  392. case UA_ATTRIBUTEID_ACCESSLEVEL:
  393. break;
  394. case UA_ATTRIBUTEID_USERACCESSLEVEL:
  395. break;
  396. case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
  397. break;
  398. case UA_ATTRIBUTEID_HISTORIZING:
  399. break;
  400. case UA_ATTRIBUTEID_EXECUTABLE:
  401. break;
  402. case UA_ATTRIBUTEID_USEREXECUTABLE:
  403. break;
  404. default:
  405. break;
  406. }
  407. return samplingError;
  408. }
  409. void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
  410. UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
  411. size_t encodingOffset = 0;
  412. if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
  413. return;
  414. // FIXME: Actively suppress non change value based monitoring. There should be
  415. // another function to handle status and events.
  416. if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
  417. return;
  418. MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
  419. if(!newvalue)
  420. return;
  421. newvalue->listEntry.tqe_next = UA_NULL;
  422. newvalue->listEntry.tqe_prev = UA_NULL;
  423. UA_Variant_init(&newvalue->value);
  424. // Verify that the *Node being monitored is still valid
  425. // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
  426. // e.g. in multithreaded applications
  427. const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
  428. if(!target) {
  429. UA_free(newvalue);
  430. return;
  431. }
  432. UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
  433. &newvalue->value);
  434. UA_NodeStore_release(target);
  435. if(samplingError != UA_FALSE || !newvalue->value.type) {
  436. UA_Variant_deleteMembers(&newvalue->value);
  437. UA_free(newvalue);
  438. return;
  439. }
  440. if(monitoredItem->queueSize.currentValue >= monitoredItem->queueSize.maxValue) {
  441. if(monitoredItem->discardOldest != UA_TRUE) {
  442. // We cannot remove the oldest value and theres no queue space left. We're done here.
  443. UA_free(newvalue);
  444. return;
  445. }
  446. MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
  447. TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
  448. UA_free(queueItem);
  449. monitoredItem->queueSize.currentValue--;
  450. }
  451. // encode the data to find if its different to the previous
  452. newValueAsByteString.length = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT]);
  453. newValueAsByteString.data = UA_malloc(newValueAsByteString.length);
  454. UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT], &newValueAsByteString, &encodingOffset);
  455. if(!monitoredItem->lastSampledValue.data) {
  456. UA_ByteString_copy(&newValueAsByteString, &monitoredItem->lastSampledValue);
  457. TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
  458. monitoredItem->queueSize.currentValue++;
  459. monitoredItem->lastSampled = UA_DateTime_now();
  460. UA_free(newValueAsByteString.data);
  461. } else {
  462. if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == UA_TRUE) {
  463. UA_Variant_deleteMembers(&newvalue->value);
  464. UA_free(newvalue);
  465. UA_String_deleteMembers(&newValueAsByteString);
  466. return;
  467. }
  468. UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
  469. monitoredItem->lastSampledValue = newValueAsByteString;
  470. TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
  471. monitoredItem->queueSize.currentValue++;
  472. monitoredItem->lastSampled = UA_DateTime_now();
  473. }
  474. }