ua_subscription.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Chris Iatrou
  7. * Copyright 2015-2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2015 (c) Joakim L. Gilje
  10. * Copyright 2016-2017 (c) Florian Palm
  11. * Copyright 2015-2016 (c) Oleksiy Vasylyev
  12. * Copyright 2017 (c) frax2222
  13. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  14. * Copyright 2017 (c) Mattias Bornhager
  15. */
  16. #include "ua_server_internal.h"
  17. #include "ua_subscription.h"
  18. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  19. void UA_Notification_delete(UA_Notification *n) {
  20. if(n->mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  21. UA_DataValue_deleteMembers(&n->data.value);
  22. } else {
  23. // TODO: Event-Handling
  24. }
  25. UA_free(n);
  26. }
  27. UA_Subscription *
  28. UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
  29. /* Allocate the memory */
  30. UA_Subscription *newSub =
  31. (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription));
  32. if(!newSub)
  33. return NULL;
  34. /* Remaining members are covered by calloc zeroing out the memory */
  35. newSub->session = session;
  36. newSub->subscriptionId = subscriptionId;
  37. newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
  38. TAILQ_INIT(&newSub->retransmissionQueue);
  39. TAILQ_INIT(&newSub->notificationQueue);
  40. return newSub;
  41. }
  42. void
  43. UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
  44. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
  45. "Delete the subscription", sub->subscriptionId);
  46. Subscription_unregisterPublishCallback(server, sub);
  47. /* Delete monitored Items */
  48. UA_MonitoredItem *mon, *tmp_mon;
  49. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) {
  50. LIST_REMOVE(mon, listEntry);
  51. UA_MonitoredItem_delete(server, mon);
  52. }
  53. sub->monitoredItemsSize = 0;
  54. /* Delete Retransmission Queue */
  55. UA_NotificationMessageEntry *nme, *nme_tmp;
  56. TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) {
  57. TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
  58. UA_NotificationMessage_deleteMembers(&nme->message);
  59. UA_free(nme);
  60. }
  61. sub->retransmissionQueueSize = 0;
  62. }
  63. UA_MonitoredItem *
  64. UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) {
  65. UA_MonitoredItem *mon;
  66. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  67. if(mon->monitoredItemId == monitoredItemId)
  68. break;
  69. }
  70. return mon;
  71. }
  72. UA_StatusCode
  73. UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
  74. UA_UInt32 monitoredItemId) {
  75. /* Find the MonitoredItem */
  76. UA_MonitoredItem *mon;
  77. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  78. if(mon->monitoredItemId == monitoredItemId)
  79. break;
  80. }
  81. if(!mon)
  82. return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  83. /* Remove the MonitoredItem */
  84. LIST_REMOVE(mon, listEntry);
  85. sub->monitoredItemsSize--;
  86. /* Remove content and delayed free */
  87. UA_MonitoredItem_delete(server, mon);
  88. return UA_STATUSCODE_GOOD;
  89. }
  90. void
  91. UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon) {
  92. sub->monitoredItemsSize++;
  93. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  94. }
  95. static void
  96. UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
  97. UA_NotificationMessageEntry *entry) {
  98. /* Release the oldest entry if there is not enough space */
  99. if(server->config.maxRetransmissionQueueSize > 0 &&
  100. sub->retransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
  101. UA_NotificationMessageEntry *lastentry =
  102. TAILQ_LAST(&sub->retransmissionQueue, ListOfNotificationMessages);
  103. TAILQ_REMOVE(&sub->retransmissionQueue, lastentry, listEntry);
  104. --sub->retransmissionQueueSize;
  105. UA_NotificationMessage_deleteMembers(&lastentry->message);
  106. UA_free(lastentry);
  107. }
  108. /* Add entry */
  109. TAILQ_INSERT_HEAD(&sub->retransmissionQueue, entry, listEntry);
  110. ++sub->retransmissionQueueSize;
  111. }
  112. UA_StatusCode
  113. UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
  114. /* Find the retransmission message */
  115. UA_NotificationMessageEntry *entry;
  116. TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
  117. if(entry->message.sequenceNumber == sequenceNumber)
  118. break;
  119. }
  120. if(!entry)
  121. return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
  122. /* Remove the retransmission message */
  123. TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry);
  124. --sub->retransmissionQueueSize;
  125. UA_NotificationMessage_deleteMembers(&entry->message);
  126. UA_free(entry);
  127. return UA_STATUSCODE_GOOD;
  128. }
  129. /* Iterate over the monitoreditems of the subscription, starting at mon, and
  130. * move notifications into the response. */
  131. static void
  132. moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotification *mins,
  133. size_t minsSize) {
  134. size_t pos = 0;
  135. UA_Notification *notification, *notification_tmp;
  136. TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) {
  137. if(pos >= minsSize)
  138. return;
  139. UA_MonitoredItem *mon = notification->mon;
  140. /* Remove the notification from the queues */
  141. TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
  142. TAILQ_REMOVE(&mon->queue, notification, listEntry);
  143. --mon->queueSize;
  144. --sub->notificationQueueSize;
  145. /* Move the content to the response */
  146. UA_MonitoredItemNotification *min = &mins[pos];
  147. min->clientHandle = mon->clientHandle;
  148. if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
  149. min->value = notification->data.value;
  150. } else {
  151. /* TODO implementation for events */
  152. }
  153. UA_free(notification);
  154. ++pos;
  155. }
  156. }
  157. static UA_StatusCode
  158. prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
  159. size_t notifications) {
  160. /* Array of ExtensionObject to hold different kinds of notifications
  161. * (currently only DataChangeNotifications) */
  162. message->notificationData = UA_ExtensionObject_new();
  163. if(!message->notificationData)
  164. return UA_STATUSCODE_BADOUTOFMEMORY;
  165. message->notificationDataSize = 1;
  166. /* Allocate Notification */
  167. UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
  168. if(!dcn) {
  169. UA_NotificationMessage_deleteMembers(message);
  170. return UA_STATUSCODE_BADOUTOFMEMORY;
  171. }
  172. UA_ExtensionObject *data = message->notificationData;
  173. data->encoding = UA_EXTENSIONOBJECT_DECODED;
  174. data->content.decoded.data = dcn;
  175. data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
  176. /* Allocate array of notifications */
  177. dcn->monitoredItems = (UA_MonitoredItemNotification *)
  178. UA_Array_new(notifications,
  179. &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
  180. if(!dcn->monitoredItems) {
  181. UA_NotificationMessage_deleteMembers(message);
  182. return UA_STATUSCODE_BADOUTOFMEMORY;
  183. }
  184. dcn->monitoredItemsSize = notifications;
  185. /* Move notifications into the response .. the point of no return */
  186. moveNotificationsFromMonitoredItems(sub, dcn->monitoredItems, notifications);
  187. return UA_STATUSCODE_GOOD;
  188. }
  189. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) The value 0 is
  190. * never used for the sequence number */
  191. static UA_UInt32
  192. UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  193. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  194. if(nextSequenceNumber == 0)
  195. nextSequenceNumber = 1;
  196. return nextSequenceNumber;
  197. }
  198. static void
  199. publishCallback(UA_Server *server, UA_Subscription *sub) {
  200. sub->readyNotifications = sub->notificationQueueSize;
  201. UA_Subscription_publish(server, sub);
  202. }
  203. void
  204. UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
  205. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
  206. "Publish Callback", sub->subscriptionId);
  207. /* Dequeue a response */
  208. UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(sub->session);
  209. if(pre) {
  210. sub->currentLifetimeCount = 0; /* Reset the LifetimeCounter */
  211. } else {
  212. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  213. "Subscription %u | The publish queue is empty",
  214. sub->subscriptionId);
  215. ++sub->currentLifetimeCount;
  216. if(sub->currentLifetimeCount > sub->lifeTimeCount) {
  217. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  218. "Subscription %u | End of lifetime "
  219. "for subscription", sub->subscriptionId);
  220. UA_Session_deleteSubscription(server, sub->session, sub->subscriptionId);
  221. /* TODO: send a StatusChangeNotification with Bad_Timeout */
  222. return;
  223. }
  224. }
  225. if (sub->readyNotifications > sub->notificationQueueSize)
  226. sub->readyNotifications = sub->notificationQueueSize;
  227. /* Count the available notifications */
  228. UA_UInt32 notifications = sub->readyNotifications;
  229. if(!sub->publishingEnabled)
  230. notifications = 0;
  231. UA_Boolean moreNotifications = false;
  232. if(notifications > sub->notificationsPerPublish) {
  233. notifications = sub->notificationsPerPublish;
  234. moreNotifications = true;
  235. }
  236. /* Return if no notifications and no keepalive */
  237. if(notifications == 0) {
  238. ++sub->currentKeepAliveCount;
  239. if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) {
  240. if(pre)
  241. UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
  242. return;
  243. }
  244. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  245. "Subscription %u | Sending a KeepAlive",
  246. sub->subscriptionId);
  247. }
  248. /* We want to send a response. Is it possible? */
  249. UA_SecureChannel *channel = sub->session->header.channel;
  250. if(!channel || !pre) {
  251. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  252. "Subscription %u | Want to send a publish response but can't. "
  253. "The subscription is late.", sub->subscriptionId);
  254. sub->state = UA_SUBSCRIPTIONSTATE_LATE;
  255. if(pre)
  256. UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
  257. return;
  258. }
  259. /* Prepare the response */
  260. UA_PublishResponse *response = &pre->response;
  261. UA_NotificationMessage *message = &response->notificationMessage;
  262. UA_NotificationMessageEntry *retransmission = NULL;
  263. if(notifications > 0) {
  264. /* Allocate the retransmission entry */
  265. retransmission = (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
  266. if(!retransmission) {
  267. UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
  268. "Subscription %u | Could not allocate memory for retransmission. "
  269. "The subscription is late.", sub->subscriptionId);
  270. sub->state = UA_SUBSCRIPTIONSTATE_LATE;
  271. UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
  272. return;
  273. }
  274. /* Prepare the response */
  275. UA_StatusCode retval = prepareNotificationMessage(sub, message, notifications);
  276. if(retval != UA_STATUSCODE_GOOD) {
  277. UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
  278. "Subscription %u | Could not prepare the notification message. "
  279. "The subscription is late.", sub->subscriptionId);
  280. UA_free(retransmission);
  281. sub->state = UA_SUBSCRIPTIONSTATE_LATE;
  282. UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
  283. return;
  284. }
  285. }
  286. /* <-- The point of no return --> */
  287. /* Adjust the number of ready notifications */
  288. UA_assert(sub->readyNotifications >= notifications);
  289. sub->readyNotifications -= notifications;
  290. /* Set up the response */
  291. response->responseHeader.timestamp = UA_DateTime_now();
  292. response->subscriptionId = sub->subscriptionId;
  293. response->moreNotifications = moreNotifications;
  294. message->publishTime = response->responseHeader.timestamp;
  295. /* Set the sequence number. The sequence number will be reused if there are
  296. * no notifications (and this is a keepalive message). */
  297. message->sequenceNumber = UA_Subscription_nextSequenceNumber(sub->sequenceNumber);
  298. if(notifications != 0) {
  299. /* There are notifications. So we can't reuse the sequence number. */
  300. sub->sequenceNumber = message->sequenceNumber;
  301. /* Put the notification message into the retransmission queue. This
  302. * needs to be done here, so that the message itself is included in the
  303. * available sequence numbers for acknowledgement. */
  304. retransmission->message = response->notificationMessage;
  305. UA_Subscription_addRetransmissionMessage(server, sub, retransmission);
  306. }
  307. /* Get the available sequence numbers from the retransmission queue */
  308. size_t available = sub->retransmissionQueueSize;
  309. UA_STACKARRAY(UA_UInt32, seqNumbers, available);
  310. if(available > 0) {
  311. response->availableSequenceNumbers = seqNumbers;
  312. response->availableSequenceNumbersSize = available;
  313. size_t i = 0;
  314. UA_NotificationMessageEntry *nme;
  315. TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
  316. response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
  317. ++i;
  318. }
  319. }
  320. /* Send the response */
  321. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  322. "Subscription %u | Sending out a publish response "
  323. "with %u notifications", sub->subscriptionId,
  324. (UA_UInt32)notifications);
  325. UA_SecureChannel_sendSymmetricMessage(sub->session->header.channel, pre->requestId,
  326. UA_MESSAGETYPE_MSG, response,
  327. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
  328. /* Reset subscription state to normal */
  329. sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
  330. sub->currentKeepAliveCount = 0;
  331. /* Free the response */
  332. UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
  333. UA_free(pre); /* No need for UA_PublishResponse_deleteMembers */
  334. /* Repeat sending responses if there are more notifications to send */
  335. if(moreNotifications)
  336. UA_Subscription_publish(server, sub);
  337. }
  338. UA_Boolean
  339. UA_Subscription_reachedPublishReqLimit(UA_Server *server, UA_Session *session) {
  340. UA_LOG_DEBUG_SESSION(server->config.logger, session, "Reached number of publish request limit");
  341. /* Dequeue a response */
  342. UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session);
  343. /* Cannot publish without a response */
  344. if(!pre) {
  345. UA_LOG_FATAL_SESSION(server->config.logger, session, "No publish requests available");
  346. return false;
  347. }
  348. /* <-- The point of no return --> */
  349. UA_PublishResponse *response = &pre->response;
  350. UA_NotificationMessage *message = &response->notificationMessage;
  351. /* Set up the response. Note that this response has no related subscription id */
  352. response->responseHeader.timestamp = UA_DateTime_now();
  353. response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS;
  354. response->subscriptionId = 0;
  355. response->moreNotifications = false;
  356. message->publishTime = response->responseHeader.timestamp;
  357. message->sequenceNumber = 0;
  358. response->availableSequenceNumbersSize = 0;
  359. /* Send the response */
  360. UA_LOG_DEBUG_SESSION(server->config.logger, session,
  361. "Sending out a publish response triggered by too many publish requests");
  362. UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId,
  363. UA_MESSAGETYPE_MSG, response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
  364. /* Free the response */
  365. UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
  366. UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
  367. return true;
  368. }
  369. UA_StatusCode
  370. Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
  371. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
  372. "Subscription %u | Register subscription "
  373. "publishing callback", sub->subscriptionId);
  374. if(sub->publishCallbackIsRegistered)
  375. return UA_STATUSCODE_GOOD;
  376. UA_StatusCode retval =
  377. UA_Server_addRepeatedCallback(server, (UA_ServerCallback)publishCallback,
  378. sub, (UA_UInt32)sub->publishingInterval, &sub->publishCallbackId);
  379. if(retval != UA_STATUSCODE_GOOD)
  380. return retval;
  381. sub->publishCallbackIsRegistered = true;
  382. return UA_STATUSCODE_GOOD;
  383. }
  384. UA_StatusCode
  385. Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
  386. UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
  387. "Unregister subscription publishing callback", sub->subscriptionId);
  388. if(!sub->publishCallbackIsRegistered)
  389. return UA_STATUSCODE_GOOD;
  390. UA_StatusCode retval = UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
  391. if(retval != UA_STATUSCODE_GOOD)
  392. return retval;
  393. sub->publishCallbackIsRegistered = false;
  394. return UA_STATUSCODE_GOOD;
  395. }
  396. /* When the session has publish requests stored but the last subscription is
  397. * deleted... Send out empty responses */
  398. void
  399. UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session) {
  400. /* No session or there are remaining subscriptions */
  401. if(!session || LIST_FIRST(&session->serverSubscriptions))
  402. return;
  403. /* Send a response for every queued request */
  404. UA_PublishResponseEntry *pre;
  405. while((pre = UA_Session_dequeuePublishReq(session))) {
  406. UA_PublishResponse *response = &pre->response;
  407. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  408. response->responseHeader.timestamp = UA_DateTime_now();
  409. UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId, UA_MESSAGETYPE_MSG,
  410. response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
  411. UA_PublishResponse_deleteMembers(response);
  412. UA_free(pre);
  413. }
  414. }
  415. #endif /* UA_ENABLE_SUBSCRIPTIONS */