networklayer_tcp_concurrent.c 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. /*
  2. * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  3. * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  4. */
  5. #define _GNU_SOURCE
  6. #include <uv.h>
  7. #include <assert.h>
  8. #include <malloc.h>
  9. #include "networklayer_tcp.h"
  10. #include "ua_transport.h"
  11. struct NetworklayerTCP {
  12. UA_Server *server;
  13. uv_loop_t *uvloop;
  14. uv_tcp_t uvserver;
  15. UA_Boolean *running;
  16. UA_ConnectionConfig localConf;
  17. UA_UInt32 port;
  18. UA_UInt32 connectionsSize;
  19. };
  20. UA_Int32 NetworklayerTCP_new(NetworklayerTCP **newlayer, UA_ConnectionConfig localConf, UA_UInt32 port) {
  21. *newlayer = malloc(sizeof(NetworklayerTCP));
  22. if(newlayer == UA_NULL)
  23. return UA_ERROR;
  24. (*newlayer)->localConf = localConf;
  25. (*newlayer)->port = port;
  26. return UA_SUCCESS;
  27. }
  28. void NetworklayerTCP_delete(NetworklayerTCP *layer) {
  29. free(layer);
  30. }
  31. // callback structure to delete the buffer after the asynchronous write finished
  32. typedef struct {
  33. uv_write_t req;
  34. unsigned int bufsize;
  35. uv_buf_t *bufs;
  36. } write_req_t;
  37. static void on_close(uv_handle_t * handle) {
  38. if (handle->data) {
  39. //UA_Connection_deleteMembers((UA_Connection*) handle->data);
  40. free(handle->data);
  41. }
  42. free(handle);
  43. }
  44. void close(void *handle) {
  45. uv_close((uv_handle_t *)handle, on_close);
  46. }
  47. static void after_shutdown(uv_shutdown_t * req, int status) {
  48. uv_close((uv_handle_t *) req->handle, on_close);
  49. free(req);
  50. }
  51. static void after_write(uv_write_t * req, int status) {
  52. write_req_t *wr = (write_req_t*)req; // todo: use container_of
  53. for(UA_UInt32 i=0;i<wr->bufsize;i++)
  54. free(wr->bufs[i].base);
  55. free(wr->bufs);
  56. if (status) {
  57. printf("uv_write error");
  58. uv_close((uv_handle_t *) req->handle, on_close);
  59. }
  60. free(wr);
  61. }
  62. static void write(void *handle, const UA_ByteStringArray buf) {
  63. uv_buf_t *uv_bufs = malloc(buf.stringsSize * sizeof(uv_buf_t));
  64. for(UA_UInt32 i=0; i<buf.stringsSize; i++) {
  65. uv_bufs[i].len = buf.strings[i].length;
  66. uv_bufs[i].base = (char*)buf.strings[i].data;
  67. }
  68. write_req_t *wr = malloc(sizeof(write_req_t));
  69. wr->bufsize = buf.stringsSize;
  70. wr->bufs = uv_bufs;
  71. if(uv_write(&wr->req, (uv_stream_t*)handle, uv_bufs, buf.stringsSize, after_write))
  72. printf("uv_write failed");
  73. }
  74. static void handle_message(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
  75. if (nread < 0) {
  76. printf("connection ended");
  77. if (buf.base)
  78. free(buf.base);
  79. uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t));
  80. uv_shutdown(req, handle, after_shutdown);
  81. return;
  82. }
  83. if (nread == 0) {
  84. free(buf.base);
  85. return;
  86. }
  87. NetworklayerTCP *layer = (NetworklayerTCP*)handle->loop->data;
  88. UA_Server *server = (UA_Server*) layer->server;
  89. UA_Connection *connection = (UA_Connection*) handle->data;
  90. UA_ByteString readBuffer;
  91. readBuffer.length = nread; // the buffer might be longer
  92. readBuffer.data = (UA_Byte*)buf.base;
  93. UA_Server_processBinaryMessage(server, connection, &readBuffer);
  94. free(buf.base);
  95. return;
  96. }
  97. static uv_buf_t read_alloc(uv_handle_t * handle, size_t suggested_size) {
  98. NetworklayerTCP *layer = (NetworklayerTCP*)handle->loop->data;
  99. UA_UInt32 receive_bufsize = layer->localConf.recvBufferSize;
  100. char* buf = malloc(sizeof(char)*receive_bufsize);
  101. return uv_buf_init(buf, receive_bufsize);
  102. }
  103. static void on_connection(uv_stream_t *server, int status) {
  104. if (status) {
  105. printf("Connect error");
  106. return;
  107. }
  108. uv_loop_t *loop = server->loop;
  109. NetworklayerTCP *layer= (NetworklayerTCP*)loop->data;
  110. uv_tcp_t *stream = malloc(sizeof(uv_tcp_t));
  111. if(uv_tcp_init(loop, stream))
  112. return;
  113. UA_Connection *connection = malloc(sizeof(UA_Connection));
  114. UA_Connection_init(connection, layer->localConf, stream, close, write);
  115. stream->data = connection;
  116. assert(uv_accept(server, (uv_stream_t*)stream) == 0);
  117. assert(uv_read_start((uv_stream_t*)stream, read_alloc, handle_message) == 0);
  118. }
  119. void check_running(uv_timer_t* handle, int status) {
  120. NetworklayerTCP *layer = (NetworklayerTCP*)handle->loop->data;
  121. if(!*layer->running)
  122. uv_stop(layer->uvloop);
  123. }
  124. UA_Int32 NetworkLayerTCP_run(NetworklayerTCP *layer, UA_Server *server, struct timeval tv, void(*worker)(UA_Server*), UA_Boolean *running) {
  125. layer->uvloop = uv_default_loop();
  126. layer->server = server;
  127. struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", layer->port);
  128. if(uv_tcp_init(layer->uvloop, &layer->uvserver)) {
  129. printf("Socket creation error\n");
  130. return UA_ERROR;
  131. }
  132. if(uv_tcp_bind(&layer->uvserver, addr)) {
  133. printf("Bind error\n");
  134. return UA_ERROR;
  135. }
  136. #define MAXBACKLOG 10
  137. if(uv_listen((uv_stream_t*)&layer->uvserver, MAXBACKLOG, on_connection)) {
  138. printf("Listen error");
  139. return UA_ERROR;
  140. }
  141. layer->uvloop->data = (void*)layer; // so we can get the pointer to the server
  142. layer->running = running;
  143. uv_timer_t timer_check_running;
  144. uv_timer_init(layer->uvloop, &timer_check_running);
  145. uv_timer_start(&timer_check_running, check_running, 0, 500);
  146. uv_run(layer->uvloop, UV_RUN_DEFAULT);
  147. return UA_SUCCESS;
  148. }