ua_network_pubsub_udp.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. */
  7. /* Enable POSIX features */
  8. #if !defined(_XOPEN_SOURCE) && !defined(_WRS_KERNEL)
  9. # define _XOPEN_SOURCE 600
  10. #endif
  11. #ifndef _DEFAULT_SOURCE
  12. # define _DEFAULT_SOURCE
  13. #endif
  14. /* On older systems we need to define _BSD_SOURCE.
  15. * _DEFAULT_SOURCE is an alias for that. */
  16. #ifndef _BSD_SOURCE
  17. # define _BSD_SOURCE
  18. #endif
  19. /* Disable some security warnings on MSVC */
  20. #ifdef _MSC_VER
  21. # define _CRT_SECURE_NO_WARNINGS
  22. #endif
  23. /* Assume that Windows versions are newer than Windows XP */
  24. #if defined(__MINGW32__) && (!defined(WINVER) || WINVER < 0x501)
  25. # undef WINVER
  26. # undef _WIN32_WINDOWS
  27. # undef _WIN32_WINNT
  28. # define WINVER 0x0501
  29. # define _WIN32_WINDOWS 0x0501
  30. # define _WIN32_WINNT 0x0501
  31. #endif
  32. #ifdef _WIN32
  33. # include <winsock2.h>
  34. # include <ws2tcpip.h>
  35. # include <Iphlpapi.h>
  36. # define CLOSESOCKET(S) closesocket((SOCKET)S)
  37. # define ssize_t int
  38. # define UA_fd_set(fd, fds) FD_SET((unsigned int)fd, fds)
  39. # define UA_fd_isset(fd, fds) FD_ISSET((unsigned int)fd, fds)
  40. #else /* _WIN32 */
  41. # define CLOSESOCKET(S) close(S)
  42. #include <sys/types.h>
  43. #include <sys/socket.h>
  44. #include <netdb.h>
  45. #include <unistd.h>
  46. #include <arpa/inet.h>
  47. #include <net/if.h>
  48. # define UA_fd_set(fd, fds) FD_SET(fd, fds)
  49. # define UA_fd_isset(fd, fds) FD_ISSET(fd, fds)
  50. # endif /* Not Windows */
  51. #include <stdio.h>
  52. #include "ua_plugin_network.h"
  53. #include "ua_network_pubsub_udp.h"
  54. #include "ua_log_stdout.h"
  55. //UDP multicast network layer specific internal data
  56. typedef struct {
  57. int ai_family; //Protocol family for socket. IPv4/IPv6
  58. struct sockaddr_storage *ai_addr; //https://msdn.microsoft.com/de-de/library/windows/desktop/ms740496(v=vs.85).aspx
  59. UA_UInt32 messageTTL;
  60. UA_Boolean enableLoopback;
  61. UA_Boolean enableReuse;
  62. } UA_PubSubChannelDataUDPMC;
  63. /**
  64. * Open communication socket based on the connectionConfig. Protocol specific parameters are
  65. * provided within the connectionConfig as KeyValuePair.
  66. * Currently supported options: "ttl" , "loopback", "reuse"
  67. *
  68. * @return ref to created channel, NULL on error
  69. */
  70. static UA_PubSubChannel *
  71. UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
  72. #ifdef _WIN32
  73. WSADATA wsaData;
  74. WSAStartup(MAKEWORD(2, 2), &wsaData);
  75. #endif /* Not Windows */
  76. UA_NetworkAddressUrlDataType address;
  77. if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
  78. address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data;
  79. } else {
  80. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Invalid Address.");
  81. return NULL;
  82. }
  83. //allocate and init memory for the UDP multicast specific internal data
  84. UA_PubSubChannelDataUDPMC * channelDataUDPMC =
  85. (UA_PubSubChannelDataUDPMC *) UA_calloc(1, (sizeof(UA_PubSubChannelDataUDPMC)));
  86. if(!channelDataUDPMC){
  87. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
  88. return NULL;
  89. }
  90. //set default values
  91. memcpy(channelDataUDPMC, &(UA_PubSubChannelDataUDPMC){0, NULL, 255, UA_TRUE, UA_TRUE}, sizeof(UA_PubSubChannelDataUDPMC));
  92. //iterate over the given KeyValuePair paramters
  93. UA_String ttlParam = UA_STRING("ttl"), loopbackParam = UA_STRING("loopback"), reuseParam = UA_STRING("reuse");
  94. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  95. if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &ttlParam)){
  96. if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
  97. channelDataUDPMC->messageTTL = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
  98. }
  99. } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &loopbackParam)){
  100. if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
  101. channelDataUDPMC->enableLoopback = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
  102. }
  103. } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &reuseParam)){
  104. if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
  105. channelDataUDPMC->enableReuse = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
  106. }
  107. } else {
  108. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation. Unknown connection parameter.");
  109. }
  110. }
  111. UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel));
  112. if(!newChannel){
  113. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
  114. UA_free(channelDataUDPMC);
  115. return NULL;
  116. }
  117. struct addrinfo hints, *rp, *requestResult = NULL;
  118. memset(&hints, 0, sizeof hints);
  119. hints.ai_family = AF_UNSPEC;
  120. hints.ai_socktype = SOCK_DGRAM;
  121. hints.ai_flags = 0;
  122. hints.ai_protocol = 0;
  123. UA_String hostname, path;
  124. UA_UInt16 networkPort;
  125. //TODO replace fallback to use the existing parseEndpointUrl function. Extend parseEndpointUrl for UDP or create own parseEndpointUrl function for PubSub.
  126. if(strncmp((char*)&address.url.data, "opc.udp://", 10) != 0){
  127. strncpy((char*)address.url.data, "opc.tcp://", 10);
  128. } else {
  129. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  130. "PubSub Connection creation failed. Invalid URL.");
  131. UA_free(channelDataUDPMC);
  132. UA_free(newChannel);
  133. return NULL;
  134. }
  135. if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
  136. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  137. "PubSub Connection creation failed. Invalid URL.");
  138. UA_free(channelDataUDPMC);
  139. UA_free(newChannel);
  140. return NULL;
  141. }
  142. if(hostname.length > 512) {
  143. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  144. "PubSub Connection creation failed. URL maximum length is 512.");
  145. UA_free(channelDataUDPMC);
  146. UA_free(newChannel);
  147. return NULL;
  148. }
  149. UA_STACKARRAY(char, addressAsChar, sizeof(char) * hostname.length +1);
  150. memcpy(addressAsChar, hostname.data, hostname.length);
  151. addressAsChar[hostname.length] = 0;
  152. char port[6];
  153. sprintf(port, "%u", networkPort);
  154. if(getaddrinfo(addressAsChar, port, &hints, &requestResult) != 0) {
  155. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  156. "PubSub Connection creation failed. Internal error.");
  157. UA_free(channelDataUDPMC);
  158. UA_free(newChannel);
  159. return NULL;
  160. }
  161. //check if the ip address is a multicast address
  162. if(requestResult->ai_family == PF_INET){
  163. struct in_addr imr_interface;
  164. inet_pton(AF_INET, addressAsChar, &imr_interface);
  165. if((ntohl(imr_interface.s_addr) & 0xF0000000) != 0xE0000000){
  166. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  167. "PubSub Connection creation failed. No multicast address.");
  168. freeaddrinfo(requestResult);
  169. UA_free(channelDataUDPMC);
  170. UA_free(newChannel);
  171. return NULL;
  172. }
  173. } else {
  174. //TODO check if ipv6 addrr is multicast address.
  175. }
  176. for(rp = requestResult; rp != NULL; rp = rp->ai_next){
  177. newChannel->sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
  178. if(newChannel->sockfd != -1){
  179. break; /*success*/
  180. }
  181. }
  182. if(!rp){
  183. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  184. "PubSub Connection creation failed. Internal error.");
  185. freeaddrinfo(requestResult);
  186. UA_free(channelDataUDPMC);
  187. UA_free(newChannel);
  188. return NULL;
  189. }
  190. channelDataUDPMC->ai_family = rp->ai_family;
  191. channelDataUDPMC->ai_addr = (struct sockaddr_storage *) UA_calloc(1, sizeof(struct sockaddr_storage));
  192. if(!channelDataUDPMC->ai_addr){
  193. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  194. "PubSub Connection creation failed. Out of memory.");
  195. CLOSESOCKET(newChannel->sockfd);
  196. freeaddrinfo(requestResult);
  197. UA_free(channelDataUDPMC);
  198. UA_free(newChannel);
  199. return NULL;
  200. }
  201. memcpy(channelDataUDPMC->ai_addr, rp->ai_addr, sizeof(*rp->ai_addr));
  202. //link channel and internal channel data
  203. newChannel->handle = channelDataUDPMC;
  204. //Set loop back data to your host
  205. if(setsockopt(newChannel->sockfd,
  206. requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6:IPPROTO_IP,
  207. requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_LOOP : IP_MULTICAST_LOOP,
  208. (const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback)) < 0) {
  209. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  210. "PubSub Connection creation failed. Loopback setup failed.");
  211. CLOSESOCKET(newChannel->sockfd);
  212. freeaddrinfo(requestResult);
  213. UA_free(channelDataUDPMC);
  214. UA_free(newChannel);
  215. return NULL;
  216. }
  217. //Set Time to live (TTL). Value of 1 prevent forward beyond the local network.
  218. if(setsockopt(newChannel->sockfd,
  219. requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6:IPPROTO_IP,
  220. requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL,
  221. (const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL)) < 0) {
  222. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  223. "PubSub Connection creation problem. Time to live setup failed.");
  224. }
  225. //Set reuse address -> enables sharing of the same listening address on different sockets.
  226. if(channelDataUDPMC->enableReuse){
  227. int enableReuse = 1;
  228. if(setsockopt(newChannel->sockfd,
  229. SOL_SOCKET, SO_REUSEADDR,
  230. (const char*)&enableReuse, sizeof(enableReuse)) < 0){
  231. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  232. "PubSub Connection creation problem. Reuse address setup failed.");
  233. }
  234. }
  235. //Set the physical interface for outgoing traffic
  236. if(address.networkInterface.length > 0){
  237. UA_STACKARRAY(char, interfaceAsChar, sizeof(char) * address.networkInterface.length + 1);
  238. memcpy(interfaceAsChar, address.networkInterface.data, address.networkInterface.length);
  239. interfaceAsChar[address.networkInterface.length] = 0;
  240. enum{
  241. IPv4,
  242. IPv6,
  243. INVALID
  244. } ipVersion;
  245. union {
  246. struct ip_mreq ipv4;
  247. struct ipv6_mreq ipv6;
  248. } group;
  249. if(inet_pton(AF_INET, interfaceAsChar, &group.ipv4.imr_interface)){
  250. ipVersion = IPv4;
  251. } else if (inet_pton(AF_INET6, interfaceAsChar, &group.ipv6.ipv6mr_multiaddr)){
  252. group.ipv6.ipv6mr_interface = if_nametoindex(interfaceAsChar);
  253. ipVersion = IPv6;
  254. } else {
  255. ipVersion = INVALID;
  256. }
  257. if(ipVersion == INVALID ||
  258. setsockopt(newChannel->sockfd,
  259. requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
  260. requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_IF : IP_MULTICAST_IF,
  261. ipVersion == IPv6 ? (void *) &group.ipv6.ipv6mr_interface : &group.ipv4.imr_interface,
  262. ipVersion == IPv6 ? sizeof(group.ipv6.ipv6mr_interface) : sizeof(struct in_addr)) < 0){
  263. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  264. "PubSub Connection creation problem. Interface selection failed.");
  265. };
  266. }
  267. freeaddrinfo(requestResult);
  268. newChannel->state = UA_PUBSUB_CHANNEL_PUB;
  269. return newChannel;
  270. }
  271. /**
  272. * Subscribe to a given address.
  273. *
  274. * @return UA_STATUSCODE_GOOD on success
  275. */
  276. static UA_StatusCode
  277. UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
  278. if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_RDY)){
  279. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
  280. return UA_STATUSCODE_BADINTERNALERROR;
  281. }
  282. UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
  283. if(connectionConfig->ai_family == PF_INET){//IPv4 handling
  284. struct sockaddr_in addr;
  285. memcpy(&addr, connectionConfig->ai_addr, sizeof(struct sockaddr_in));
  286. addr.sin_addr.s_addr = INADDR_ANY;
  287. if (bind(channel->sockfd, (const struct sockaddr *)&addr, sizeof(struct sockaddr_in)) != 0){
  288. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
  289. return UA_STATUSCODE_BADINTERNALERROR;
  290. }
  291. struct ip_mreq groupV4;
  292. memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
  293. groupV4.imr_interface.s_addr = htonl(INADDR_ANY);
  294. //multihomed hosts can join several groups on different IF, INADDR_ANY -> kernel decides
  295. if(setsockopt(channel->sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0){
  296. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
  297. return UA_STATUSCODE_BADINTERNALERROR;
  298. }
  299. } else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
  300. //TODO implement regist for IPv6
  301. } else {
  302. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
  303. return UA_STATUSCODE_BADINTERNALERROR;
  304. }
  305. return UA_STATUSCODE_GOOD;
  306. }
  307. /**
  308. * Remove current subscription.
  309. *
  310. * @return UA_STATUSCODE_GOOD on success
  311. */
  312. static UA_StatusCode
  313. UA_PubSubChannelUDPMC_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
  314. if(!(channel->state == UA_PUBSUB_CHANNEL_PUB_SUB || channel->state == UA_PUBSUB_CHANNEL_SUB)){
  315. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
  316. return UA_STATUSCODE_BADINTERNALERROR;
  317. }
  318. UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
  319. if(connectionConfig->ai_family == PF_INET){//IPv4 handling
  320. struct ip_mreq groupV4;
  321. memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
  322. groupV4.imr_interface.s_addr = htonl(INADDR_ANY);
  323. if(setsockopt(channel->sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0){
  324. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
  325. return UA_STATUSCODE_BADINTERNALERROR;
  326. }
  327. } else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
  328. //TODO implement unregist for IPv6
  329. } else {
  330. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
  331. return UA_STATUSCODE_BADINTERNALERROR;
  332. }
  333. return UA_STATUSCODE_GOOD;
  334. }
  335. /**
  336. * Send messages to the connection defined address
  337. *
  338. * @return UA_STATUSCODE_GOOD if success
  339. */
  340. static UA_StatusCode
  341. UA_PubSubChannelUDPMC_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettigns, const UA_ByteString *buf) {
  342. UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
  343. if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)){
  344. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed. Invalid state.");
  345. return UA_STATUSCODE_BADINTERNALERROR;
  346. }
  347. //TODO evalute: chunk messages or check against MTU?
  348. long nWritten = 0;
  349. while (nWritten < (long)buf->length) {
  350. long n = sendto(channel->sockfd, buf->data, buf->length, 0,
  351. (struct sockaddr *) channelConfigUDPMC->ai_addr, sizeof(struct sockaddr_storage));
  352. if(n == -1L) {
  353. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed.");
  354. return UA_STATUSCODE_BADINTERNALERROR;
  355. }
  356. nWritten += n;
  357. }
  358. return UA_STATUSCODE_GOOD;
  359. }
  360. /**
  361. * Receive messages. The regist function should be called before.
  362. *
  363. * @param timeout in usec | on windows platforms are only multiples of 1000usec possible
  364. * @return
  365. */
  366. static UA_StatusCode
  367. UA_PubSubChannelUDPMC_receive(UA_PubSubChannel *channel, UA_ByteString *message, UA_ExtensionObject *transportSettigns, UA_UInt32 timeout){
  368. if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)) {
  369. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection receive failed. Invalid state.");
  370. return UA_STATUSCODE_BADINTERNALERROR;
  371. }
  372. UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
  373. if(timeout > 0) {
  374. fd_set fdset;
  375. FD_ZERO(&fdset);
  376. UA_fd_set(channel->sockfd, &fdset);
  377. struct timeval tmptv = {(long int)(timeout / 1000000),
  378. (long int)(timeout % 1000000)};
  379. int resultsize = select(channel->sockfd+1, &fdset, NULL,
  380. NULL, &tmptv);
  381. if(resultsize == 0) {
  382. message->length = 0;
  383. return UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
  384. }
  385. if (resultsize == -1) {
  386. message->length = 0;
  387. return UA_STATUSCODE_BADINTERNALERROR;
  388. }
  389. }
  390. if(channelConfigUDPMC->ai_family == PF_INET){
  391. ssize_t messageLength;
  392. messageLength = recvfrom(channel->sockfd, message->data, message->length, 0, NULL, NULL);
  393. if(messageLength > 0){
  394. message->length = (size_t) messageLength;
  395. } else {
  396. message->length = 0;
  397. }
  398. } else {
  399. //TODO implement recieve for IPv6
  400. }
  401. return UA_STATUSCODE_GOOD;
  402. }
  403. /**
  404. * Close channel and free the channel data.
  405. *
  406. * @return UA_STATUSCODE_GOOD if success
  407. */
  408. static UA_StatusCode
  409. UA_PubSubChannelUDPMC_close(UA_PubSubChannel *channel) {
  410. #ifdef _WIN32
  411. WSACleanup();
  412. #endif /* Not Windows */
  413. if(CLOSESOCKET(channel->sockfd) != 0){
  414. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection delete failed.");
  415. return UA_STATUSCODE_BADINTERNALERROR;
  416. }
  417. //cleanup the internal NetworkLayer data
  418. UA_PubSubChannelDataUDPMC *networkLayerData = (UA_PubSubChannelDataUDPMC *) channel->handle;
  419. UA_free(networkLayerData->ai_addr);
  420. UA_free(networkLayerData);
  421. UA_free(channel);
  422. return UA_STATUSCODE_GOOD;
  423. }
  424. /**
  425. * Generate a new channel. based on the given configuration.
  426. *
  427. * @param connectionConfig connection configuration
  428. * @return ref to created channel, NULL on error
  429. */
  430. static UA_PubSubChannel *
  431. TransportLayerUDPMC_addChannel(UA_PubSubConnectionConfig *connectionConfig) {
  432. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub channel requested");
  433. UA_PubSubChannel * pubSubChannel = UA_PubSubChannelUDPMC_open(connectionConfig);
  434. if(pubSubChannel){
  435. pubSubChannel->regist = UA_PubSubChannelUDPMC_regist;
  436. pubSubChannel->unregist = UA_PubSubChannelUDPMC_unregist;
  437. pubSubChannel->send = UA_PubSubChannelUDPMC_send;
  438. pubSubChannel->receive = UA_PubSubChannelUDPMC_receive;
  439. pubSubChannel->close = UA_PubSubChannelUDPMC_close;
  440. pubSubChannel->connectionConfig = connectionConfig;
  441. }
  442. return pubSubChannel;
  443. }
  444. //UDPMC channel factory
  445. UA_PubSubTransportLayer
  446. UA_PubSubTransportLayerUDPMP() {
  447. UA_PubSubTransportLayer pubSubTransportLayer;
  448. pubSubTransportLayer.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
  449. pubSubTransportLayer.createPubSubChannel = &TransportLayerUDPMC_addChannel;
  450. return pubSubTransportLayer;
  451. }
  452. #undef _POSIX_C_SOURCE