ua_subscription_manager.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. #ifdef ENABLESUBSCRIPTIONS
  2. #include "ua_types.h"
  3. #include "ua_server_internal.h"
  4. #include "ua_nodestore.h"
  5. #include "ua_subscription_manager.h"
  6. void SubscriptionManager_init(UA_Session *session) {
  7. UA_SubscriptionManager *manager = &(session->subscriptionManager);
  8. /* FIXME: These init values are empirical. Maybe they should be part
  9. * of the server config? */
  10. manager->GlobalPublishingInterval = (UA_Int32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
  11. manager->GlobalLifeTimeCount = (UA_UInt32_BoundedValue) { .maxValue = 15000, .minValue = 0, .currentValue=0 };
  12. manager->GlobalKeepAliveCount = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
  13. manager->GlobalNotificationsPerPublish = (UA_Int32_BoundedValue) { .maxValue = 1000, .minValue = 1, .currentValue=0 };
  14. manager->GlobalSamplingInterval = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
  15. manager->GlobalQueueSize = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
  16. manager->ServerSubscriptions = (UA_ListOfUASubscriptions *) malloc (sizeof(UA_ListOfUASubscriptions));
  17. LIST_INIT(manager->ServerSubscriptions);
  18. manager->LastSessionID = (UA_UInt32) UA_DateTime_now();
  19. return;
  20. }
  21. UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
  22. UA_Subscription *new = (UA_Subscription *) malloc(sizeof(UA_Subscription));
  23. new->SubscriptionID = SubscriptionID;
  24. new->LastPublished = 0;
  25. new->SequenceNumber = 0;
  26. new->MonitoredItems = (UA_ListOfUAMonitoredItems *) malloc (sizeof(UA_ListOfUAMonitoredItems));
  27. LIST_INIT(new->MonitoredItems);
  28. LIST_INITENTRY(new, listEntry);
  29. new->unpublishedNotifications = (UA_ListOfUnpublishedNotifications *) malloc(sizeof(UA_ListOfUnpublishedNotifications));
  30. LIST_INIT(new->unpublishedNotifications);
  31. return new;
  32. }
  33. UA_MonitoredItem *UA_MonitoredItem_new() {
  34. UA_MonitoredItem *new = (UA_MonitoredItem *) malloc(sizeof(UA_MonitoredItem));
  35. new->queue = (UA_ListOfQueuedDataValues *) malloc (sizeof(UA_ListOfQueuedDataValues));
  36. new->QueueSize = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
  37. new->LastSampled = 0;
  38. // FIXME: This is currently hardcoded;
  39. new->MonitoredItemType = MONITOREDITEM_CHANGENOTIFY_T;
  40. LIST_INIT(new->queue);
  41. LIST_INITENTRY(new, listEntry);
  42. INITPOINTER(new->monitoredNode);
  43. INITPOINTER(new->LastSampledValue.data );
  44. return new;
  45. }
  46. void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {
  47. LIST_INSERT_HEAD(manager->ServerSubscriptions, newSubscription, listEntry);
  48. return;
  49. }
  50. UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
  51. UA_Subscription *retsub, *sub;
  52. retsub = (UA_Subscription *) NULL;
  53. for (sub = (manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
  54. if (sub->SubscriptionID == SubscriptionID) {
  55. retsub = sub;
  56. break;
  57. }
  58. }
  59. return retsub;
  60. }
  61. UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID) {
  62. UA_Subscription *sub;
  63. UA_MonitoredItem *mon;
  64. if (manager == NULL) return UA_STATUSCODE_BADINTERNALERROR;
  65. sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);
  66. if (sub == NULL) return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  67. for(mon=(sub->MonitoredItems)->lh_first; mon != NULL; mon=(mon->listEntry).le_next) {
  68. if (mon->ItemId == MonitoredItemID) {
  69. MonitoredItem_delete(mon);
  70. return UA_STATUSCODE_GOOD;
  71. }
  72. }
  73. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  74. }
  75. void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
  76. if (monitoredItem == NULL) return;
  77. // Delete Queued Data
  78. MonitoredItem_ClearQueue(monitoredItem);
  79. // Remove from subscription list
  80. LIST_REMOVE(monitoredItem, listEntry);
  81. // Release comparison sample
  82. if(monitoredItem->LastSampledValue.data != NULL) {
  83. UA_free(monitoredItem->LastSampledValue.data);
  84. }
  85. UA_free(monitoredItem);
  86. return;
  87. }
  88. UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
  89. UA_Subscription *sub;
  90. UA_MonitoredItem *mon;
  91. UA_unpublishedNotification *notify;
  92. sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);
  93. if (sub != NULL) {
  94. // Delete registered subscriptions
  95. while (sub->MonitoredItems->lh_first != NULL) {
  96. mon = sub->MonitoredItems->lh_first;
  97. // Delete Sampled data
  98. MonitoredItem_delete(mon);
  99. }
  100. }
  101. else {
  102. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  103. }
  104. // Delete queued notification messages
  105. notify = sub->unpublishedNotifications->lh_first;
  106. while (sub->unpublishedNotifications->lh_first != NULL) {
  107. notify = sub->unpublishedNotifications->lh_first;
  108. LIST_REMOVE(notify, listEntry);
  109. UA_free(notify);
  110. }
  111. LIST_REMOVE(sub, listEntry);
  112. UA_free(sub);
  113. return UA_STATUSCODE_GOOD;
  114. }
  115. UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
  116. UA_UInt32 j = 0;
  117. if (subscription == NULL) return 0;
  118. for(UA_unpublishedNotification *i = subscription->unpublishedNotifications->lh_first; i != NULL; i=(i->listEntry).le_next) j++;
  119. return j;
  120. }
  121. void Subscription_generateKeepAlive(UA_Subscription *subscription) {
  122. UA_unpublishedNotification *msg = NULL;
  123. if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
  124. msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
  125. LIST_INITENTRY(msg, listEntry);
  126. INITPOINTER(msg->notification);
  127. msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
  128. INITPOINTER(msg->notification->notificationData);
  129. msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
  130. msg->notification->publishTime = UA_DateTime_now();
  131. msg->notification->notificationDataSize = 0;
  132. LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
  133. subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
  134. }
  135. return;
  136. }
  137. void Subscription_updateNotifications(UA_Subscription *subscription) {
  138. UA_MonitoredItem *mon;
  139. //MonitoredItem_queuedValue *queuedValue;
  140. UA_unpublishedNotification *msg = NULL;
  141. UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
  142. UA_DataChangeNotification *changeNotification;
  143. size_t notificationOffset;
  144. if (subscription == NULL) return;
  145. if ((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now()) return;
  146. // Make sure there is data to be published and establish which message types
  147. // will need to be generated
  148. for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
  149. // Check if this MonitoredItems Queue holds data and how much data is held in total
  150. if (mon->queue->lh_first != NULL) {
  151. if ((mon->MonitoredItemType & MONITOREDITEM_CHANGENOTIFY_T) != 0) monItemsChangeT+=mon->QueueSize.currentValue;
  152. else if ((mon->MonitoredItemType & MONITOREDITEM_STATUSNOTIFY_T) != 0) monItemsStatusT+=mon->QueueSize.currentValue;
  153. else if ((mon->MonitoredItemType & MONITOREDITEM_EVENTNOTIFY_T) != 0) monItemsEventT+=mon->QueueSize.currentValue;
  154. }
  155. }
  156. // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
  157. if (Subscription_queuedNotifications(subscription) >= 10) {
  158. // Remove last entry
  159. for(msg = subscription->unpublishedNotifications->lh_first; (msg->listEntry).le_next != NULL; msg=(msg->listEntry).le_next);
  160. LIST_REMOVE(msg, listEntry);
  161. UA_free(msg);
  162. }
  163. if (monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
  164. // Decrement KeepAlive
  165. subscription->KeepAliveCount.currentValue--;
  166. // +- Generate KeepAlive msg if counter overruns
  167. Subscription_generateKeepAlive(subscription);
  168. return;
  169. }
  170. msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
  171. LIST_INITENTRY(msg, listEntry);
  172. INITPOINTER(msg->notification);
  173. msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
  174. INITPOINTER(msg->notification->notificationData);
  175. msg->notification->sequenceNumber = subscription->SequenceNumber++;
  176. msg->notification->publishTime = UA_DateTime_now();
  177. // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
  178. // list of Queued values from all monitoredItems of that type
  179. msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);// + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
  180. msg->notification->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject) * msg->notification->notificationDataSize);
  181. for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
  182. // Set the notification message type and encoding for each of
  183. // the three possible NotificationData Types
  184. (msg->notification->notificationData)[notmsgn].encoding = 1; // Encoding is always binary
  185. (msg->notification->notificationData)[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
  186. if(notmsgn == 0) {
  187. // Construct a DataChangeNotification
  188. changeNotification = (UA_DataChangeNotification *) malloc(sizeof(UA_DataChangeNotification));
  189. // Create one DataChangeNotification for each queue item held in each monitoredItems queue:
  190. changeNotification->monitoredItems = (UA_MonitoredItemNotification *) malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
  191. // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
  192. // the propper NotificationMessageType (Status, Change, Event)
  193. monItemsChangeT = 0;
  194. for(mon=subscription->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
  195. if (mon->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T || mon->queue->lh_first == NULL ) continue;
  196. // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
  197. monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
  198. MonitoredItem_ClearQueue(mon);
  199. }
  200. changeNotification->monitoredItemsSize = monItemsChangeT;
  201. changeNotification->diagnosticInfosSize = 0;
  202. changeNotification->diagnosticInfos = NULL;
  203. (msg->notification->notificationData[notmsgn]).body.length = UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
  204. (msg->notification->notificationData[notmsgn]).body.data = malloc((msg->notification->notificationData[notmsgn]).body.length);
  205. notificationOffset = 0;
  206. UA_encodeBinary((const void *) changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION], &(msg->notification->notificationData[notmsgn].body), &notificationOffset);
  207. UA_free(changeNotification->monitoredItems);
  208. UA_free(changeNotification);
  209. }
  210. else if (notmsgn == 1) {
  211. // FIXME: Constructing a StatusChangeNotification is not implemented
  212. }
  213. else if (notmsgn == 2) {
  214. // FIXME: Constructing a EventListNotification is not implemented
  215. }
  216. }
  217. LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
  218. return;
  219. }
  220. int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst, UA_MonitoredItem *monitoredItem) {
  221. int queueSize = 0;
  222. MonitoredItem_queuedValue *queueItem;
  223. // Count instead of relying on the items currentValue
  224. for (queueItem = monitoredItem->queue->lh_first; queueItem != NULL; queueItem=queueItem->listEntry.le_next) {
  225. dst[queueSize].clientHandle = monitoredItem->ClientHandle;
  226. dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
  227. dst[queueSize].value.hasServerTimestamp = UA_FALSE;
  228. dst[queueSize].value.serverTimestamp = UA_FALSE;
  229. dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
  230. dst[queueSize].value.hasSourceTimestamp = UA_FALSE;
  231. dst[queueSize].value.hasValue = UA_TRUE;
  232. dst[queueSize].value.status = UA_STATUSCODE_GOOD;
  233. UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
  234. // Do not create variants with no type -> will make calcSizeBinary() segfault.
  235. if(dst[queueSize].value.value.type == NULL) {
  236. queueSize--;
  237. };
  238. queueSize++;
  239. }
  240. if (queueSize == 0) return 0;
  241. return queueSize;
  242. }
  243. void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
  244. MonitoredItem_queuedValue *val;
  245. if (monitoredItem == NULL) return;
  246. while(monitoredItem->queue->lh_first != NULL) {
  247. val = monitoredItem->queue->lh_first;
  248. LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
  249. UA_free(val);
  250. }
  251. (monitoredItem->QueueSize).currentValue = 0;
  252. return;
  253. }
  254. UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src, UA_Variant *dst) {
  255. UA_Boolean samplingError = UA_TRUE;
  256. UA_DataValue sourceDataValue;
  257. const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode *) src;
  258. // FIXME: Not all AttributeIDs can be monitored yet
  259. switch(AttributeID) {
  260. case UA_ATTRIBUTEID_NODEID:
  261. UA_Variant_setScalarCopy(dst, (const UA_NodeId *) &(src->nodeId), &UA_TYPES[UA_TYPES_NODEID]);
  262. samplingError = UA_FALSE;
  263. break;
  264. case UA_ATTRIBUTEID_NODECLASS:
  265. UA_Variant_setScalarCopy(dst, (const UA_Int32 *) &(src->nodeClass), &UA_TYPES[UA_TYPES_INT32]);
  266. samplingError = UA_FALSE;
  267. break;
  268. case UA_ATTRIBUTEID_BROWSENAME:
  269. UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->browseName), &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
  270. samplingError = UA_FALSE;
  271. break;
  272. case UA_ATTRIBUTEID_DISPLAYNAME:
  273. UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
  274. samplingError = UA_FALSE;
  275. break;
  276. case UA_ATTRIBUTEID_DESCRIPTION:
  277. UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
  278. samplingError = UA_FALSE;
  279. break;
  280. case UA_ATTRIBUTEID_WRITEMASK:
  281. UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
  282. samplingError = UA_FALSE;
  283. break;
  284. case UA_ATTRIBUTEID_USERWRITEMASK:
  285. UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
  286. samplingError = UA_FALSE;
  287. break;
  288. case UA_ATTRIBUTEID_ISABSTRACT:
  289. break;
  290. case UA_ATTRIBUTEID_SYMMETRIC:
  291. break;
  292. case UA_ATTRIBUTEID_INVERSENAME:
  293. break;
  294. case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
  295. break;
  296. case UA_ATTRIBUTEID_EVENTNOTIFIER:
  297. break;
  298. case UA_ATTRIBUTEID_VALUE:
  299. if (src->nodeClass == UA_NODECLASS_VARIABLE) {
  300. if ( srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
  301. UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) src)->value, dst);
  302. samplingError = UA_FALSE;
  303. }
  304. else if (srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
  305. if (srcAsVariableNode->value.dataSource.read(((const UA_VariableNode *) src)->value.dataSource.handle, (UA_Boolean) UA_TRUE, &sourceDataValue) == UA_STATUSCODE_GOOD) {
  306. UA_Variant_copy( (const UA_Variant *) &(sourceDataValue.value), dst);
  307. samplingError = UA_FALSE;
  308. }
  309. }
  310. }
  311. break;
  312. case UA_ATTRIBUTEID_DATATYPE:
  313. break;
  314. case UA_ATTRIBUTEID_VALUERANK:
  315. break;
  316. case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
  317. break;
  318. case UA_ATTRIBUTEID_ACCESSLEVEL:
  319. break;
  320. case UA_ATTRIBUTEID_USERACCESSLEVEL:
  321. break;
  322. case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
  323. break;
  324. case UA_ATTRIBUTEID_HISTORIZING:
  325. break;
  326. case UA_ATTRIBUTEID_EXECUTABLE:
  327. break;
  328. case UA_ATTRIBUTEID_USEREXECUTABLE:
  329. break;
  330. default:
  331. break;
  332. }
  333. return samplingError;
  334. }
  335. void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
  336. MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
  337. UA_Boolean samplingError = UA_TRUE;
  338. UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
  339. size_t encodingOffset = 0;
  340. if (monitoredItem == NULL) return;
  341. if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) {
  342. return;
  343. };
  344. // FIXME: Actively suppress non change value based monitoring. There should be another function to handle status and events.
  345. if (monitoredItem->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T) {
  346. return;
  347. }
  348. newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
  349. LIST_INITENTRY(newvalue,listEntry);
  350. newvalue->value.arrayLength = 0;
  351. newvalue->value.arrayDimensionsSize = 0;
  352. newvalue->value.arrayDimensions = NULL;
  353. newvalue->value.data = NULL;
  354. newvalue->value.type = NULL;
  355. samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, monitoredItem->monitoredNode, &(newvalue->value));
  356. if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
  357. if (newvalue->value.type != NULL && monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
  358. for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
  359. LIST_REMOVE(queueItem, listEntry);
  360. UA_free(queueItem);
  361. (monitoredItem->QueueSize).currentValue--;
  362. }
  363. else {
  364. // We cannot remove the oldest value and theres no queue space left. We're done here.
  365. UA_free(newvalue);
  366. return;
  367. }
  368. }
  369. // Only add a value if we have sampled it correctly and it fits into the queue;
  370. if ( samplingError != UA_FALSE || newvalue->value.type == NULL || (monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
  371. UA_free(newvalue);
  372. return;
  373. }
  374. newValueAsByteString.length = UA_calcSizeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT]);
  375. newValueAsByteString.data = malloc(newValueAsByteString.length);
  376. UA_encodeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT], &(newValueAsByteString), &encodingOffset );
  377. if(monitoredItem->LastSampledValue.data == NULL) {
  378. UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
  379. LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
  380. (monitoredItem->QueueSize).currentValue++;
  381. monitoredItem->LastSampled = UA_DateTime_now();
  382. }
  383. else {
  384. if (UA_String_equal((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue)) == UA_TRUE) {
  385. UA_free(newValueAsByteString.data);
  386. return;
  387. }
  388. UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
  389. LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
  390. (monitoredItem->QueueSize).currentValue++;
  391. monitoredItem->LastSampled = UA_DateTime_now();
  392. }
  393. return;
  394. }
  395. UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
  396. UA_UInt32 *seqArray;
  397. int i;
  398. UA_unpublishedNotification *not;
  399. if (sub == NULL) return NULL;
  400. seqArray = (UA_UInt32 *) malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
  401. if (seqArray == NULL ) return NULL;
  402. i = 0;
  403. for(not = sub->unpublishedNotifications->lh_first; not != NULL; not=(not->listEntry).le_next) {
  404. seqArray[i] = not->notification->sequenceNumber;
  405. i++;
  406. }
  407. return seqArray;
  408. }
  409. void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
  410. UA_NotificationMessage *latest;
  411. if (dst == NULL) return;
  412. if (Subscription_queuedNotifications(sub) == 0) {
  413. dst->notificationDataSize = 0;
  414. dst->publishTime = UA_DateTime_now();
  415. dst->sequenceNumber = 0;
  416. return;
  417. }
  418. latest = sub->unpublishedNotifications->lh_first->notification;
  419. dst->notificationDataSize = latest->notificationDataSize;
  420. dst->publishTime = latest->publishTime;
  421. dst->sequenceNumber = latest->sequenceNumber;
  422. if (latest->notificationDataSize == 0) return;
  423. dst->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject));
  424. dst->notificationData->encoding = latest->notificationData->encoding;
  425. dst->notificationData->typeId = latest->notificationData->typeId;
  426. dst->notificationData->body.length = latest->notificationData->body.length;
  427. dst->notificationData->body.data = malloc(latest->notificationData->body.length);
  428. UA_ByteString_copy((UA_String *) &(latest->notificationData->body), (UA_String *) &(dst->notificationData->body));
  429. return;
  430. }
  431. UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
  432. UA_unpublishedNotification *not;
  433. UA_UInt32 deletedItems = 0;
  434. for(not=sub->unpublishedNotifications->lh_first; not != NULL; not=not->listEntry.le_next) {
  435. if (not->notification->sequenceNumber == seqNo) {
  436. LIST_REMOVE(not, listEntry);
  437. if (not->notification != NULL) {
  438. if (not->notification->notificationData != NULL) {
  439. if (not->notification->notificationData->body.data != NULL) {
  440. UA_free(not->notification->notificationData->body.data);
  441. }
  442. UA_free(not->notification->notificationData);
  443. }
  444. UA_free(not->notification);
  445. }
  446. UA_free(not);
  447. deletedItems++;
  448. }
  449. }
  450. return deletedItems;
  451. }
  452. #endif //#ifdef ENABLESUBSCRIPTIONS