ua_pubsub_manager.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. */
  7. #include "ua_pubsub_manager.h"
  8. #include "server/ua_server_internal.h"
  9. #define UA_DATETIMESTAMP_2000 125911584000000000
  10. /**
  11. * Add new Connection to the current PubSub configuration.
  12. *
  13. * @param server
  14. * @param connectionConfig config of the new Connection
  15. * @param connectionIdentifier nodeId of the generated Connection (NULL if not needed)
  16. * @return UA_STATUSCODE_GOOD if success
  17. */
  18. UA_StatusCode
  19. UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig *connectionConfig,
  20. UA_NodeId *connectionIdentifier) {
  21. //iterate over the available UA_PubSubTransportLayers
  22. for(size_t i = 0; i < server->config.pubsubTransportLayersSize; i++) {
  23. if(connectionConfig && UA_String_equal(&server->config.pubsubTransportLayers[i].transportProfileUri,
  24. &connectionConfig->transportProfileUri)){
  25. //create new connection config
  26. UA_PubSubConnectionConfig *tmpConnectionConfig = (UA_PubSubConnectionConfig *)
  27. UA_calloc(1, sizeof(UA_PubSubConnectionConfig));
  28. if(!tmpConnectionConfig){
  29. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  30. "PubSub Connection creation failed. Out of Memory.");
  31. return UA_STATUSCODE_BADOUTOFMEMORY;
  32. }
  33. //deep copy the given connection config
  34. if(UA_PubSubConnectionConfig_copy(connectionConfig, tmpConnectionConfig) != UA_STATUSCODE_GOOD){
  35. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  36. "PubSub Connection creation failed. Config copy problem.");
  37. return UA_STATUSCODE_BADOUTOFMEMORY;
  38. }
  39. //create new connection and add to UA_PubSubManager
  40. UA_PubSubConnection *newConnectionsField = (UA_PubSubConnection *)
  41. UA_realloc(server->pubSubManager.connections,
  42. sizeof(UA_PubSubConnection) * (server->pubSubManager.connectionsSize + 1));
  43. if(!newConnectionsField) {
  44. UA_PubSubConnectionConfig_deleteMembers(tmpConnectionConfig);
  45. UA_free(tmpConnectionConfig);
  46. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  47. "PubSub Connection creation failed. Out of Memory.");
  48. return UA_STATUSCODE_BADOUTOFMEMORY;
  49. }
  50. server->pubSubManager.connections = newConnectionsField;
  51. UA_PubSubConnection *newConnection = &server->pubSubManager.connections[server->pubSubManager.connectionsSize];
  52. memset(newConnection, 0, sizeof(UA_PubSubConnection));
  53. newConnection->config = tmpConnectionConfig;
  54. newConnection->channel = server->config.pubsubTransportLayers[i].createPubSubChannel(newConnection->config);
  55. if(!newConnection->channel){
  56. UA_PubSubConnection_delete(newConnection);
  57. if(server->pubSubManager.connectionsSize > 0){
  58. newConnectionsField = (UA_PubSubConnection *)
  59. UA_realloc(server->pubSubManager.connections,
  60. sizeof(UA_PubSubConnection) * (server->pubSubManager.connectionsSize));
  61. if(!newConnectionsField) {
  62. return UA_STATUSCODE_BADINTERNALERROR;
  63. }
  64. server->pubSubManager.connections = newConnectionsField;
  65. } else {
  66. UA_free(newConnectionsField);
  67. server->pubSubManager.connections = NULL;
  68. }
  69. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  70. "PubSub Connection creation failed. Transport layer creation problem.");
  71. return UA_STATUSCODE_BADINTERNALERROR;
  72. }
  73. UA_PubSubManager_generateUniqueNodeId(server, &newConnection->identifier);
  74. if(connectionIdentifier != NULL){
  75. UA_NodeId_copy(&newConnection->identifier, connectionIdentifier);
  76. }
  77. server->pubSubManager.connectionsSize++;
  78. return UA_STATUSCODE_GOOD;
  79. }
  80. }
  81. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  82. "PubSub Connection creation failed. Requested transport layer not found.");
  83. return UA_STATUSCODE_BADNOTFOUND;
  84. }
  85. /**
  86. * Remove Connection, identified by the NodeId. Deletion of Connection
  87. * removes all contained WriterGroups and Writers.
  88. *
  89. * @param server
  90. * @param connectionIdentifier
  91. * @return UA_STATUSCODE_GOOD on success
  92. */
  93. UA_StatusCode UA_Server_removePubSubConnection(UA_Server *server, UA_NodeId connectionIdentifier) {
  94. //search the identified Connection and store the Connection index
  95. size_t connectionIndex;
  96. UA_PubSubConnection *currentConnection = NULL;
  97. for(connectionIndex = 0; connectionIndex < server->pubSubManager.connectionsSize; connectionIndex++){
  98. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[connectionIndex].identifier)){
  99. currentConnection = &server->pubSubManager.connections[connectionIndex];
  100. break;
  101. }
  102. }
  103. if(!currentConnection){
  104. return UA_STATUSCODE_BADNOTFOUND;
  105. }
  106. UA_PubSubConnection_delete(currentConnection);
  107. server->pubSubManager.connectionsSize--;
  108. //remove the connection from the pubSubManager, move the last connection into the allocated memory of the deleted connection
  109. if(server->pubSubManager.connectionsSize != connectionIndex){
  110. memcpy(&server->pubSubManager.connections[connectionIndex],
  111. &server->pubSubManager.connections[server->pubSubManager.connectionsSize], sizeof(UA_PubSubConnection));
  112. }
  113. if(server->pubSubManager.connectionsSize <= 0){
  114. UA_free(server->pubSubManager.connections);
  115. server->pubSubManager.connections = NULL;
  116. } else {
  117. server->pubSubManager.connections = (UA_PubSubConnection *)
  118. UA_realloc(server->pubSubManager.connections, sizeof(UA_PubSubConnection) * server->pubSubManager.connectionsSize);
  119. if(!server->pubSubManager.connections){
  120. return UA_STATUSCODE_BADINTERNALERROR;
  121. }
  122. }
  123. return UA_STATUSCODE_GOOD;
  124. }
  125. /**
  126. * Add PublishedDataSet to the current PubSub configuration.
  127. *
  128. * @param server
  129. * @param publishedDataSetConfig config of the new PDS
  130. * @param pdsIdentifier nodeId of the generated PDS (NULL if not needed)
  131. * @return UA_STATUSCODE_GOOD on success
  132. */
  133. UA_AddPublishedDataSetResult
  134. UA_Server_addPublishedDataSet(UA_Server *server, const UA_PublishedDataSetConfig *publishedDataSetConfig,
  135. UA_NodeId *pdsIdentifier) {
  136. if(!publishedDataSetConfig){
  137. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  138. "PublishedDataSet creation failed. No config passed in.");
  139. return (UA_AddPublishedDataSetResult) {UA_STATUSCODE_BADINVALIDARGUMENT, 0, NULL, {0, 0}};
  140. }
  141. if(publishedDataSetConfig->publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  142. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  143. "PublishedDataSet creation failed. Unsupported PublishedDataSet type.");
  144. return (UA_AddPublishedDataSetResult) {UA_STATUSCODE_BADINVALIDARGUMENT, 0, NULL, {0, 0}};
  145. }
  146. //deep copy the given connection config
  147. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  148. memset(&tmpPublishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
  149. if(UA_PublishedDataSetConfig_copy(publishedDataSetConfig, &tmpPublishedDataSetConfig) != UA_STATUSCODE_GOOD){
  150. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  151. "PublishedDataSet creation failed. Configuration copy failed.");
  152. return (UA_AddPublishedDataSetResult) {UA_STATUSCODE_BADINTERNALERROR, 0, NULL, {0, 0}};
  153. }
  154. //create new PDS and add to UA_PubSubManager
  155. UA_PublishedDataSet *newPubSubDataSetField = (UA_PublishedDataSet *)
  156. UA_realloc(server->pubSubManager.publishedDataSets,
  157. sizeof(UA_PublishedDataSet) * (server->pubSubManager.publishedDataSetsSize + 1));
  158. if(!newPubSubDataSetField) {
  159. UA_PublishedDataSetConfig_deleteMembers(&tmpPublishedDataSetConfig);
  160. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
  161. "PublishedDataSet creation failed. Out of Memory.");
  162. return (UA_AddPublishedDataSetResult) {UA_STATUSCODE_BADOUTOFMEMORY, 0, NULL, {0, 0}};
  163. }
  164. server->pubSubManager.publishedDataSets = newPubSubDataSetField;
  165. UA_PublishedDataSet *newPubSubDataSet = &server->pubSubManager.publishedDataSets[server->pubSubManager.publishedDataSetsSize];
  166. memset(newPubSubDataSet, 0, sizeof(UA_PublishedDataSet));
  167. newPubSubDataSet->config = tmpPublishedDataSetConfig;
  168. if(tmpPublishedDataSetConfig.publishedDataSetType == UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE){
  169. //parse template config and add fields (later PubSub batch)
  170. }
  171. //generate unique nodeId
  172. UA_PubSubManager_generateUniqueNodeId(server, &newPubSubDataSet->identifier);
  173. if(pdsIdentifier != NULL){
  174. UA_NodeId_copy(&newPubSubDataSet->identifier, pdsIdentifier);
  175. }
  176. server->pubSubManager.publishedDataSetsSize++;
  177. UA_AddPublishedDataSetResult result = {UA_STATUSCODE_GOOD, 0, NULL,
  178. {UA_PubSubConfigurationVersionTimeDifference(),
  179. UA_PubSubConfigurationVersionTimeDifference()}};
  180. return result;
  181. }
  182. /**
  183. * Remove PublishedDataSet, identified by the NodeId. Deletion of PDS
  184. * removes all contained and linked PDS Fields. Connected WriterGroups
  185. * will be also removed.
  186. *
  187. * @param server
  188. * @param pdsIdentifier
  189. * @return UA_STATUSCODE_GOOD on success
  190. */
  191. UA_StatusCode
  192. UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pdsIdentifier) {
  193. //search the identified PublishedDataSet and store the PDS index
  194. UA_PublishedDataSet *publishedDataSet = NULL;
  195. size_t publishedDataSetIndex;
  196. for(publishedDataSetIndex = 0; publishedDataSetIndex < server->pubSubManager.publishedDataSetsSize; publishedDataSetIndex++){
  197. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[publishedDataSetIndex].identifier, &pdsIdentifier)){
  198. publishedDataSet = &server->pubSubManager.publishedDataSets[publishedDataSetIndex];
  199. break;
  200. };
  201. }
  202. if(!publishedDataSet){
  203. return UA_STATUSCODE_BADNOTFOUND;
  204. }
  205. UA_PublishedDataSet_delete(publishedDataSet);
  206. server->pubSubManager.publishedDataSetsSize--;
  207. //copy the last PDS to the removed PDS inside the allocated memory block
  208. if(server->pubSubManager.publishedDataSetsSize != publishedDataSetIndex){
  209. memcpy(&server->pubSubManager.publishedDataSets[publishedDataSetIndex],
  210. &server->pubSubManager.publishedDataSets[server->pubSubManager.publishedDataSetsSize],
  211. sizeof(UA_PublishedDataSet));
  212. }
  213. if(server->pubSubManager.publishedDataSetsSize <= 0){
  214. UA_free(server->pubSubManager.publishedDataSets);
  215. server->pubSubManager.publishedDataSets = NULL;
  216. } else {
  217. server->pubSubManager.publishedDataSets = (UA_PublishedDataSet *)
  218. UA_realloc(server->pubSubManager.publishedDataSets, sizeof(UA_PublishedDataSet) * server->pubSubManager.publishedDataSetsSize);
  219. if (!server->pubSubManager.publishedDataSets) {
  220. return UA_STATUSCODE_BADINTERNALERROR;
  221. }
  222. }
  223. return UA_STATUSCODE_GOOD;
  224. }
  225. /**
  226. * Calculate the time difference between current time and UTC (00:00) on January 1, 2000.
  227. */
  228. UA_UInt32
  229. UA_PubSubConfigurationVersionTimeDifference() {
  230. UA_UInt32 timeDiffSince2000 = (UA_UInt32) (UA_DateTime_now() - UA_DATETIMESTAMP_2000);
  231. return timeDiffSince2000;
  232. }
  233. /**
  234. * Generate a new unique NodeId. This NodeId will be used for the
  235. * information model representation of PubSub entities.
  236. */
  237. void
  238. UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId) {
  239. UA_NodeId newNodeId = UA_NODEID_NUMERIC(0, 0);
  240. UA_Node *newNode = UA_Nodestore_new(server, UA_NODECLASS_OBJECT);
  241. UA_Nodestore_insert(server, newNode, &newNodeId);
  242. UA_NodeId_copy(&newNodeId, nodeId);
  243. }
  244. /**
  245. * Delete the current PubSub configuration including all nested members. This action also delete
  246. * the configured PubSub transport Layers.
  247. *
  248. * @param server
  249. * @param pubSubManager
  250. */
  251. void
  252. UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
  253. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub cleanup was called.");
  254. //remove Connections and WriterGroups
  255. while(pubSubManager->connectionsSize > 0){
  256. UA_Server_removePubSubConnection(server, pubSubManager->connections[pubSubManager->connectionsSize-1].identifier);
  257. }
  258. while(pubSubManager->publishedDataSetsSize > 0){
  259. UA_Server_removePublishedDataSet(server, pubSubManager->publishedDataSets[pubSubManager->publishedDataSetsSize-1].identifier);
  260. }
  261. //free the currently configured transport layers
  262. for(size_t i = 0; i < server->config.pubsubTransportLayersSize; i++){
  263. UA_free(&server->config.pubsubTransportLayers[i]);
  264. }
  265. }