ua_services_subscription.c 11 KB


  1. #ifdef ENABLESUBSCRIPTIONS
  2. #include "ua_services.h"
  3. #include "ua_server_internal.h"
  4. #include "ua_subscription_manager.h"
  5. #include "ua_statuscodes.h"
  6. #include "ua_util.h"
  7. #include "ua_nodestore.h"
  8. #include "ua_log.h" // Remove later, debugging only
  9. #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
  10. if (SRC > BOUNDS.maxValue) DST = BOUNDS.maxValue; \
  11. else if (SRC < BOUNDS.minValue) DST = BOUNDS.minValue; \
  12. else DST = SRC; \
  13. }
  14. UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session,
  15. const UA_CreateSubscriptionRequest *request,
  16. UA_CreateSubscriptionResponse *response) {
  17. UA_Subscription *newSubscription;
  18. // Verify Session
  19. response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
  20. if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
  21. else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
  22. if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
  23. // Create Subscription and Response
  24. response->subscriptionId = ++(session->subscriptionManager.LastSessionID);
  25. newSubscription = UA_Subscription_new(response->subscriptionId);
  26. UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalPublishingInterval, request->requestedPublishingInterval, response->revisedPublishingInterval);
  27. newSubscription->PublishingInterval = response->revisedPublishingInterval;
  28. UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalLifeTimeCount, request->requestedLifetimeCount, response->revisedLifetimeCount);
  29. newSubscription->LifeTime = (UA_UInt32_BoundedValue) { .minValue=session->subscriptionManager.GlobalLifeTimeCount.minValue, .maxValue=session->subscriptionManager.GlobalLifeTimeCount.maxValue, .currentValue=response->revisedLifetimeCount};
  30. UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalKeepAliveCount, request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
  31. newSubscription->KeepAliveCount = (UA_Int32_BoundedValue) { .minValue=session->subscriptionManager.GlobalKeepAliveCount.minValue, .maxValue=session->subscriptionManager.GlobalKeepAliveCount.maxValue, .currentValue=response->revisedMaxKeepAliveCount};
  32. newSubscription->NotificationsPerPublish = request->maxNotificationsPerPublish;
  33. newSubscription->PublishingMode = request->publishingEnabled;
  34. newSubscription->Priority = request->priority;
  35. SubscriptionManager_addSubscription(&(session->subscriptionManager), newSubscription);
  36. return (UA_Int32) 0;
  37. }
  38. UA_Int32 Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
  39. const UA_CreateMonitoredItemsRequest *request,
  40. UA_CreateMonitoredItemsResponse *response) {
  41. UA_MonitoredItem *newMon;
  42. UA_Subscription *sub;
  43. UA_MonitoredItemCreateResult *createResults, *thisItemsResult;
  44. UA_MonitoredItemCreateRequest *thisItemsRequest;
  45. // Verify Session and Subscription
  46. response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
  47. sub=SubscriptionManager_getSubscriptionByID(&(session->subscriptionManager), request->subscriptionId);
  48. if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
  49. else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
  50. else if ( sub == NULL) response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  51. if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
  52. // Allocate Result Array
  53. if (request->itemsToCreateSize > 0) {
  54. createResults = (UA_MonitoredItemCreateResult *) malloc(sizeof(UA_MonitoredItemCreateResult) * (request->itemsToCreateSize));
  55. if (createResults == NULL) {
  56. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  57. return 0;
  58. }
  59. }
  60. else return 0;
  61. response->resultsSize = request->itemsToCreateSize;
  62. response->results = createResults;
  63. for(int i=0;i<request->itemsToCreateSize;i++) {
  64. thisItemsRequest = &(request->itemsToCreate[i]);
  65. thisItemsResult = &(createResults[i]);
  66. // FIXME: Completely ignoring filters for now!
  67. thisItemsResult->filterResult.typeId = UA_NODEID_NULL;
  68. thisItemsResult->filterResult.encoding = UA_EXTENSIONOBJECT_ENCODINGMASK_NOBODYISENCODED;
  69. thisItemsResult->filterResult.body = UA_BYTESTRING_NULL;
  70. if (UA_NodeStore_get(server->nodestore, (const UA_NodeId *) &(thisItemsRequest->itemToMonitor.nodeId)) == UA_NULL) {
  71. thisItemsResult->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
  72. thisItemsResult->monitoredItemId = 0;
  73. thisItemsResult->revisedSamplingInterval = 0;
  74. thisItemsResult->revisedQueueSize = 0;
  75. }
  76. else {
  77. thisItemsResult->statusCode = UA_STATUSCODE_GOOD;
  78. newMon = UA_MonitoredItem_new();
  79. newMon->monitoredNode = UA_NodeStore_get(server->nodestore, (const UA_NodeId *) &(thisItemsRequest->itemToMonitor.nodeId));
  80. newMon->ItemId = ++(session->subscriptionManager.LastSessionID);
  81. thisItemsResult->monitoredItemId = newMon->ItemId;
  82. newMon->ClientHandle = thisItemsRequest->requestedParameters.clientHandle;
  83. UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalSamplingInterval , thisItemsRequest->requestedParameters.samplingInterval, thisItemsResult->revisedSamplingInterval);
  84. newMon->SamplingInterval = thisItemsResult->revisedSamplingInterval;
  85. UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalQueueSize, thisItemsRequest->requestedParameters.queueSize, thisItemsResult->revisedQueueSize);
  86. newMon->QueueSize = (UA_UInt32_BoundedValue) { .maxValue=(thisItemsResult->revisedQueueSize) + 1, .minValue=0, .currentValue=0 };
  87. newMon->DiscardOldest = thisItemsRequest->requestedParameters.discardOldest;
  88. LIST_INSERT_HEAD(sub->MonitoredItems, newMon, listEntry);
  89. }
  90. }
  91. return 0;
  92. }
  93. UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
  94. const UA_PublishRequest *request,
  95. UA_PublishResponse *response) {
  96. UA_Subscription *sub;
  97. //UA_MonitoredItem *mon;
  98. UA_SubscriptionManager *manager;
  99. // Verify Session and Subscription
  100. response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
  101. response->diagnosticInfosSize = 0;
  102. response->availableSequenceNumbersSize = 0;
  103. response->resultsSize = 0;
  104. response->subscriptionId = 0;
  105. response->moreNotifications = UA_FALSE;
  106. response->notificationMessage.notificationDataSize = 0;
  107. if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
  108. else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
  109. if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
  110. manager = &(session->subscriptionManager);
  111. for (sub=(manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
  112. Subscription_updateNotifications(sub);
  113. if (Subscription_queuedNotifications(sub) > 0) {
  114. response->subscriptionId = sub->SubscriptionID;
  115. Subscription_copyTopNotificationMessage(&(response->notificationMessage), sub);
  116. if (sub->unpublishedNotifications->lh_first->notification->sequenceNumber > sub->SequenceNumber) {
  117. // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
  118. response->availableSequenceNumbersSize = 0;
  119. // .. and must be deleted
  120. Subscription_deleteUnpublishedNotification(sub->SequenceNumber + 1, sub);
  121. }
  122. else {
  123. response->availableSequenceNumbersSize = Subscription_queuedNotifications(sub);
  124. response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
  125. }
  126. //printf("Publish: Session Subscription %i; Queued %i, #SeqNo %i\n", sub->SubscriptionID, Subscription_queuedNotifications(sub), sub->SequenceNumber);
  127. return 0;
  128. }
  129. }
  130. response->responseHeader.serviceResult = UA_STATUSCODE_BADSERVICEUNSUPPORTED;
  131. return 0;
  132. }
  133. UA_Int32 Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
  134. const UA_DeleteSubscriptionsRequest *request,
  135. UA_DeleteSubscriptionsResponse *response) {
  136. UA_StatusCode *retStat;
  137. // Verify Session
  138. response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
  139. if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
  140. else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
  141. if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
  142. retStat = (UA_UInt32 *) malloc(sizeof(UA_UInt32) * request->subscriptionIdsSize);
  143. if (retStat==NULL) {
  144. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  145. return -1;
  146. }
  147. for(int i=0; i<request->subscriptionIdsSize;i++) {
  148. retStat[i] = SubscriptionManager_deleteSubscription(&(session->subscriptionManager), request->subscriptionIds[i]);
  149. }
  150. response->resultsSize = request->subscriptionIdsSize;
  151. response->results = retStat;
  152. return 0;
  153. }
  154. UA_Int32 Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
  155. const UA_DeleteMonitoredItemsRequest *request,
  156. UA_DeleteMonitoredItemsResponse *response) {
  157. UA_SubscriptionManager *manager;
  158. UA_Subscription *sub;
  159. UA_Int32 *resultCodes;
  160. // Verify Session
  161. response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
  162. if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
  163. else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
  164. if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
  165. response->diagnosticInfosSize=0;
  166. response->resultsSize=0;
  167. manager = &(session->subscriptionManager);
  168. sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
  169. if (sub == NULL) {
  170. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  171. return 0;
  172. }
  173. resultCodes = (UA_Int32 *) malloc(sizeof(UA_UInt32) * request->monitoredItemIdsSize);
  174. if (resultCodes == NULL) {
  175. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  176. return 0;
  177. }
  178. response->results = (UA_StatusCode *) resultCodes;
  179. response->resultsSize = request->monitoredItemIdsSize;
  180. for(int i=0; i < request->monitoredItemIdsSize; i++) {
  181. resultCodes[i] = SubscriptionManager_deleteMonitoredItem(manager, sub->SubscriptionID, (request->monitoredItemIds)[i]);
  182. }
  183. return 0;
  184. }
  185. #endif //#ifdef ENABLESUBSCRIPTIONS