ua_services_subscription.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. #include "ua_server_internal.h"
  2. #include "ua_services.h"
  3. #include "ua_subscription.h"
  4. #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
  5. if(SRC > BOUNDS.max) DST = BOUNDS.max; \
  6. else if(SRC < BOUNDS.min) DST = BOUNDS.min; \
  7. else DST = SRC; \
  8. }
  9. static void
  10. setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
  11. UA_Double requestedPublishingInterval,
  12. UA_UInt32 requestedLifetimeCount,
  13. UA_UInt32 requestedMaxKeepAliveCount,
  14. UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
  15. Subscription_unregisterPublishJob(server, subscription);
  16. UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
  17. requestedPublishingInterval, subscription->publishingInterval);
  18. UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
  19. requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
  20. UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
  21. requestedLifetimeCount, subscription->lifeTime);
  22. if(subscription->lifeTime < 3 * subscription->maxKeepAliveCount)
  23. subscription->lifeTime = 3 * subscription->maxKeepAliveCount;
  24. subscription->notificationsPerPublish = maxNotificationsPerPublish;
  25. subscription->priority = priority;
  26. Subscription_registerPublishJob(server, subscription);
  27. }
  28. void Service_CreateSubscription(UA_Server *server, UA_Session *session,
  29. const UA_CreateSubscriptionRequest *request,
  30. UA_CreateSubscriptionResponse *response) {
  31. response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
  32. UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
  33. if(!newSubscription) {
  34. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  35. return;
  36. }
  37. UA_Session_addSubscription(session, newSubscription);
  38. newSubscription->publishingEnabled = request->publishingEnabled;
  39. setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
  40. request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
  41. request->maxNotificationsPerPublish, request->priority);
  42. response->revisedPublishingInterval = newSubscription->publishingInterval;
  43. response->revisedLifetimeCount = newSubscription->lifeTime;
  44. response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
  45. }
  46. void Service_ModifySubscription(UA_Server *server, UA_Session *session,
  47. const UA_ModifySubscriptionRequest *request,
  48. UA_ModifySubscriptionResponse *response) {
  49. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
  50. if(!sub) {
  51. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  52. return;
  53. }
  54. setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
  55. request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
  56. request->maxNotificationsPerPublish, request->priority);
  57. response->revisedPublishingInterval = sub->publishingInterval;
  58. response->revisedLifetimeCount = sub->lifeTime;
  59. response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
  60. return;
  61. }
  62. void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
  63. const UA_SetPublishingModeRequest *request, UA_SetPublishingModeResponse *response) {
  64. if (request->subscriptionIdsSize <= 0) {
  65. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
  66. return;
  67. }
  68. size_t size = request->subscriptionIdsSize;
  69. response->results = UA_Array_new(size, &UA_TYPES[UA_TYPES_STATUSCODE]);
  70. if(!response->results) {
  71. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  72. return;
  73. }
  74. response->resultsSize = size;
  75. for(size_t i = 0; i < size; i++) {
  76. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionIds[i]);
  77. if(!sub) {
  78. response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  79. continue;
  80. }
  81. sub->publishingEnabled = request->publishingEnabled;
  82. }
  83. }
  84. static void
  85. setMonitoredItemSettings(UA_Server *server, UA_MonitoredItem *mon,
  86. UA_MonitoringMode monitoringMode, UA_UInt32 clientHandle,
  87. UA_Double samplingInterval, UA_UInt32 queueSize, UA_Boolean discardOldest) {
  88. MonitoredItem_unregisterSampleJob(server, mon);
  89. mon->monitoringMode = monitoringMode;
  90. mon->clientHandle = clientHandle;
  91. UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
  92. samplingInterval, mon->samplingInterval);
  93. UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
  94. queueSize, mon->maxQueueSize);
  95. mon->discardOldest = discardOldest;
  96. MonitoredItem_registerSampleJob(server, mon);
  97. }
  98. static void
  99. Service_CreateMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
  100. const UA_MonitoredItemCreateRequest *request,
  101. UA_MonitoredItemCreateResult *result) {
  102. const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
  103. if(!target) {
  104. result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
  105. return;
  106. }
  107. UA_MonitoredItem *newMon = UA_MonitoredItem_new();
  108. if(!newMon) {
  109. result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
  110. return;
  111. }
  112. UA_StatusCode retval = UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
  113. if(retval != UA_STATUSCODE_GOOD) {
  114. result->statusCode = retval;
  115. MonitoredItem_delete(server, newMon);
  116. return;
  117. }
  118. newMon->subscription = sub;
  119. newMon->attributeID = request->itemToMonitor.attributeId;
  120. newMon->itemId = UA_Session_getUniqueSubscriptionID(session);
  121. setMonitoredItemSettings(server, newMon, MONITOREDITEM_TYPE_CHANGENOTIFY,
  122. request->requestedParameters.clientHandle,
  123. request->requestedParameters.samplingInterval,
  124. request->requestedParameters.queueSize,
  125. request->requestedParameters.discardOldest);
  126. result->revisedSamplingInterval = newMon->samplingInterval;
  127. result->revisedQueueSize = newMon->maxQueueSize;
  128. result->monitoredItemId = newMon->itemId;
  129. LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
  130. }
  131. void
  132. Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
  133. const UA_CreateMonitoredItemsRequest *request,
  134. UA_CreateMonitoredItemsResponse *response) {
  135. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
  136. if(!sub) {
  137. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  138. return;
  139. }
  140. if(request->itemsToCreateSize <= 0) {
  141. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
  142. return;
  143. }
  144. response->results = UA_Array_new(request->itemsToCreateSize,
  145. &UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT]);
  146. if(!response->results) {
  147. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  148. return;
  149. }
  150. response->resultsSize = request->itemsToCreateSize;
  151. for(size_t i = 0; i < request->itemsToCreateSize; i++)
  152. Service_CreateMonitoredItems_single(server, session, sub,
  153. &request->itemsToCreate[i],
  154. &response->results[i]);
  155. }
  156. static void
  157. Service_ModifyMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
  158. const UA_MonitoredItemModifyRequest *request,
  159. UA_MonitoredItemModifyResult *result) {
  160. UA_MonitoredItem *mon = UA_Subscription_getMonitoredItem(sub, request->monitoredItemId);
  161. if(!mon) {
  162. result->statusCode = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
  163. return;
  164. }
  165. setMonitoredItemSettings(server, mon, MONITOREDITEM_TYPE_CHANGENOTIFY,
  166. request->requestedParameters.clientHandle,
  167. request->requestedParameters.samplingInterval,
  168. request->requestedParameters.queueSize,
  169. request->requestedParameters.discardOldest);
  170. result->revisedSamplingInterval = mon->samplingInterval;
  171. result->revisedQueueSize = mon->maxQueueSize;
  172. }
  173. void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
  174. const UA_ModifyMonitoredItemsRequest *request,
  175. UA_ModifyMonitoredItemsResponse *response) {
  176. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
  177. if(!sub) {
  178. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  179. return;
  180. }
  181. if(request->itemsToModifySize <= 0) {
  182. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
  183. return;
  184. }
  185. response->results = UA_Array_new(request->itemsToModifySize,
  186. &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYRESULT]);
  187. if(!response->results) {
  188. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  189. return;
  190. }
  191. response->resultsSize = request->itemsToModifySize;
  192. for(size_t i = 0; i < request->itemsToModifySize; i++)
  193. Service_ModifyMonitoredItems_single(server, session, sub,
  194. &request->itemsToModify[i],
  195. &response->results[i]);
  196. }
  197. void
  198. Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
  199. UA_UInt32 requestId) {
  200. /* Return an error if the session has no subscription */
  201. if(LIST_EMPTY(&session->serverSubscriptions)) {
  202. UA_PublishResponse response;
  203. UA_PublishResponse_init(&response);
  204. response.responseHeader.requestHandle = request->requestHeader.requestHandle;
  205. response.responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  206. UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
  207. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
  208. return;
  209. }
  210. // todo error handling for malloc
  211. UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
  212. entry->requestId = requestId;
  213. UA_PublishResponse *response = &entry->response;
  214. UA_PublishResponse_init(response);
  215. response->responseHeader.requestHandle = request->requestHeader.requestHandle;
  216. /* Delete Acknowledged Subscription Messages */
  217. response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
  218. if(!response->results) {
  219. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  220. return;
  221. }
  222. response->resultsSize = request->subscriptionAcknowledgementsSize;
  223. for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
  224. UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
  225. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, ack->subscriptionId);
  226. if(!sub) {
  227. response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  228. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  229. "Cannot process acknowledgements subscription %u", ack->subscriptionId);
  230. continue;
  231. }
  232. response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
  233. UA_NotificationMessageEntry *pre, *pre_tmp;
  234. LIST_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
  235. if(pre->message.sequenceNumber == ack->sequenceNumber) {
  236. LIST_REMOVE(pre, listEntry);
  237. response->results[i] = UA_STATUSCODE_GOOD;
  238. UA_NotificationMessage_deleteMembers(&pre->message);
  239. UA_free(pre);
  240. break;
  241. }
  242. }
  243. }
  244. /* Queue the publish response */
  245. SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
  246. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  247. "Queued a publication message on session %u", session->authenticationToken.identifier.numeric);
  248. }
  249. void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
  250. const UA_DeleteSubscriptionsRequest *request,
  251. UA_DeleteSubscriptionsResponse *response) {
  252. response->results = UA_malloc(sizeof(UA_StatusCode) * request->subscriptionIdsSize);
  253. if(!response->results) {
  254. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  255. return;
  256. }
  257. response->resultsSize = request->subscriptionIdsSize;
  258. for(size_t i = 0; i < request->subscriptionIdsSize; i++)
  259. response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
  260. }
  261. void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
  262. const UA_DeleteMonitoredItemsRequest *request,
  263. UA_DeleteMonitoredItemsResponse *response) {
  264. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
  265. if(!sub) {
  266. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  267. return;
  268. }
  269. response->results = UA_malloc(sizeof(UA_StatusCode) * request->monitoredItemIdsSize);
  270. if(!response->results) {
  271. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  272. return;
  273. }
  274. response->resultsSize = request->monitoredItemIdsSize;
  275. for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
  276. response->results[i] = UA_Subscription_deleteMonitoredItem(server, sub, request->monitoredItemIds[i]);
  277. }
  278. void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
  279. UA_RepublishResponse *response) {
  280. /* get the subscription */
  281. UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
  282. if (!sub) {
  283. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  284. return;
  285. }
  286. /* Find the notification in the retransmission queue */
  287. UA_NotificationMessageEntry *entry;
  288. LIST_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
  289. if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
  290. break;
  291. }
  292. if(entry)
  293. response->responseHeader.serviceResult =
  294. UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
  295. else
  296. response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
  297. }