networklayer_tcp_concurrent.c 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  3. * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  4. */
  5. #define _GNU_SOURCE
  6. #include <uv.h>
  7. #include <assert.h>
  8. #include <malloc.h>
  9. #include "networklayer_tcp.h"
  10. #include "ua_transport.h"
  11. struct NetworklayerTCP {
  12. UA_Server *server;
  13. uv_loop_t *uvloop;
  14. uv_tcp_t uvserver;
  15. UA_ConnectionConfig localConf;
  16. UA_UInt32 port;
  17. UA_UInt32 connectionsSize;
  18. };
  19. UA_Int32 NetworklayerTCP_new(NetworklayerTCP **newlayer, UA_ConnectionConfig localConf, UA_UInt32 port) {
  20. *newlayer = malloc(sizeof(NetworklayerTCP));
  21. if(newlayer == UA_NULL)
  22. return UA_ERROR;
  23. (*newlayer)->localConf = localConf;
  24. (*newlayer)->port = port;
  25. return UA_SUCCESS;
  26. }
  27. void NetworklayerTCP_delete(NetworklayerTCP *layer) {
  28. free(layer);
  29. }
  30. // callback structure to delete the buffer after the asynchronous write finished
  31. typedef struct {
  32. uv_write_t req;
  33. unsigned int bufsize;
  34. uv_buf_t *bufs;
  35. } write_req_t;
  36. static void on_close(uv_handle_t * handle) {
  37. if (handle->data) {
  38. //UA_Connection_deleteMembers((UA_Connection*) handle->data);
  39. free(handle->data);
  40. }
  41. free(handle);
  42. }
  43. void close(void *handle) {
  44. uv_close((uv_handle_t *)handle, on_close);
  45. }
  46. static void after_shutdown(uv_shutdown_t * req, int status) {
  47. uv_close((uv_handle_t *) req->handle, on_close);
  48. free(req);
  49. }
  50. static void after_write(uv_write_t * req, int status) {
  51. write_req_t *wr = (write_req_t*)req; // todo: use container_of
  52. for(UA_UInt32 i=0;i<wr->bufsize;i++)
  53. free(wr->bufs[i].base);
  54. free(wr->bufs);
  55. if (status) {
  56. printf("uv_write error");
  57. uv_close((uv_handle_t *) req->handle, on_close);
  58. }
  59. free(wr);
  60. }
  61. static void write(void *handle, const UA_ByteStringArray buf) {
  62. uv_buf_t *uv_bufs = malloc(buf.stringsSize * sizeof(uv_buf_t));
  63. for(UA_UInt32 i=0; i<buf.stringsSize; i++) {
  64. uv_bufs[i].len = buf.strings[i].length;
  65. uv_bufs[i].base = (char*)buf.strings[i].data;
  66. }
  67. write_req_t *wr = malloc(sizeof(write_req_t));
  68. wr->bufsize = buf.stringsSize;
  69. wr->bufs = uv_bufs;
  70. if(uv_write(&wr->req, (uv_stream_t*)handle, uv_bufs, buf.stringsSize, after_write))
  71. printf("uv_write failed");
  72. }
  73. static void handle_message(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
  74. if (nread < 0) {
  75. printf("connection ended");
  76. if (buf.base)
  77. free(buf.base);
  78. uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t));
  79. uv_shutdown(req, handle, after_shutdown);
  80. return;
  81. }
  82. if (nread == 0) {
  83. free(buf.base);
  84. return;
  85. }
  86. NetworklayerTCP *layer = (NetworklayerTCP*)handle->loop->data;
  87. UA_Server *server = (UA_Server*) layer->server;
  88. UA_Connection *connection = (UA_Connection*) handle->data;
  89. UA_ByteString readBuffer;
  90. readBuffer.length = nread; // the buffer might be longer
  91. readBuffer.data = (UA_Byte*)buf.base;
  92. UA_Server_processBinaryMessage(server, connection, &readBuffer);
  93. free(buf.base);
  94. return;
  95. }
  96. static uv_buf_t read_alloc(uv_handle_t * handle, size_t suggested_size) {
  97. //UA_Server *server = (UA_Server*)handle->loop->data;
  98. UA_UInt32 receive_bufsize = 2048; // todo: server->layer.localConf.recvBufferSize;
  99. char* buf = malloc(sizeof(char)*receive_bufsize);
  100. return uv_buf_init(buf, receive_bufsize);
  101. }
  102. static void on_connection(uv_stream_t *server, int status) {
  103. if (status) {
  104. printf("Connect error");
  105. return;
  106. }
  107. uv_loop_t *loop = server->loop;
  108. NetworklayerTCP *layer= (NetworklayerTCP*)loop->data;
  109. uv_tcp_t *stream = malloc(sizeof(uv_tcp_t));
  110. if(uv_tcp_init(loop, stream))
  111. return;
  112. UA_Connection *connection = malloc(sizeof(UA_Connection));
  113. UA_Connection_init(connection, layer->localConf, stream, close, write);
  114. stream->data = connection;
  115. assert(uv_accept(server, (uv_stream_t*)stream) == 0);
  116. assert(uv_read_start((uv_stream_t*)stream, read_alloc, handle_message) == 0);
  117. }
  118. UA_Int32 NetworkLayerTCP_run(NetworklayerTCP *layer, UA_Server *server, struct timeval tv, void(*worker)(UA_Server*), UA_Boolean *running) {
  119. layer->uvloop = uv_default_loop();
  120. layer->server = server;
  121. struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", layer->port);
  122. if(uv_tcp_init(layer->uvloop, &layer->uvserver)) {
  123. printf("Socket creation error\n");
  124. return UA_ERROR;
  125. }
  126. if(uv_tcp_bind(&layer->uvserver, addr)) {
  127. printf("Bind error\n");
  128. return UA_ERROR;
  129. }
  130. #define MAXBACKLOG 10
  131. if(uv_listen((uv_stream_t*)&layer->uvserver, MAXBACKLOG, on_connection)) {
  132. printf("Listen error");
  133. return UA_ERROR;
  134. }
  135. layer->uvloop->data = (void*)layer; // so we can get the pointer to the server
  136. uv_run(layer->uvloop, UV_RUN_DEFAULT);
  137. uv_loop_delete(layer->uvloop);
  138. return UA_SUCCESS;
  139. }