ua_stack.c 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. #include <netinet/in.h>
  4. #include <sys/socketvar.h>
  5. #include <unistd.h> // read, write, close
  6. #include <stdlib.h> // exit
  7. #include <errno.h> // errno, EINTR
  8. #include <memory.h> // memset
  9. #include <fcntl.h> // fcntl
  10. #include "ua_stack.h"
  11. #include "ua_transportLayer.h"
  12. UA_TL_Description UA_TransportLayerDescriptorTcpBinary = {
  13. UA_TL_ENCODING_BINARY,
  14. UA_TL_CONNECTIONTYPE_TCPV4,
  15. UA_TL_MAXCONNECTIONS_DEFAULT,
  16. {-1,8192,8192,16384,1}
  17. };
  18. // TODO: We currently need a variable global to the module!
  19. UA_TL_data theTL;
  20. _Bool connectionComparer(void *p1, void* p2) {
  21. UA_TL_connection* c1 = (UA_TL_connection*) p1;
  22. UA_TL_connection* c2 = (UA_TL_connection*) p2;
  23. return (c1->connectionHandle == c2->connectionHandle);
  24. }
  25. int UA_TL_TCP_SetNonBlocking(int sock) {
  26. int opts = fcntl(sock,F_GETFL);
  27. if (opts < 0) {
  28. perror("fcntl(F_GETFL)");
  29. return -1;
  30. }
  31. opts = (opts | O_NONBLOCK);
  32. if (fcntl(sock,F_SETFL,opts) < 0) {
  33. perror("fcntl(F_SETFL)");
  34. return -1;
  35. }
  36. return 0;
  37. }
  38. /** the tcp reader thread - single shot if single-threaded, looping until CLOSE if multi-threaded
  39. */
  40. void* UA_TL_TCP_reader(UA_TL_connection *c) {
  41. UA_ByteString readBuffer;
  42. UA_alloc((void**)&(readBuffer.data),c->localConf.recvBufferSize);
  43. if (c->connectionState != connectionState_CLOSE) {
  44. do {
  45. printf("UA_TL_TCP_reader - enter read\n");
  46. readBuffer.length = read(c->connectionHandle, readBuffer.data, c->localConf.recvBufferSize);
  47. printf("UA_TL_TCP_reader - leave read\n");
  48. printf("UA_TL_TCP_reader - %*.s ",c->remoteEndpointUrl.length,c->remoteEndpointUrl.data);
  49. UA_ByteString_printx("received=",&readBuffer);
  50. if (readBuffer.length > 0) {
  51. TL_process(c,&readBuffer);
  52. } else {
  53. c->connectionState = connectionState_CLOSE;
  54. perror("ERROR reading from socket1");
  55. }
  56. } while (c->connectionState != connectionState_CLOSE && theTL.threaded == UA_STACK_MULTITHREADED);
  57. }
  58. if (c->connectionState == connectionState_CLOSE) {
  59. // clean up: socket, buffer, connection
  60. // free resources allocated with socket
  61. if (theTL.threaded == UA_STACK_SINGLETHREADED) {
  62. printf("UA_TL_TCP_reader - remove handle=%d from fd_set\n",c->connectionHandle);
  63. // FD_CLR(c->connectionHandle,&(theTL.readerHandles));
  64. }
  65. printf("UA_TL_TCP_reader - shutdown\n");
  66. printf("UA_TL_TCP_reader - enter shutdown\n");
  67. shutdown(c->connectionHandle,2);
  68. printf("UA_TL_TCP_reader - enter close\n");
  69. close(c->connectionHandle);
  70. printf("UA_TL_TCP_reader - leave close\n");
  71. c->connectionState = connectionState_CLOSED;
  72. UA_ByteString_deleteMembers(&readBuffer);
  73. printf("UA_TL_TCP_reader - search element to remove\n");
  74. UA_list_Element* lec = UA_list_search(&theTL.connections,connectionComparer,c);
  75. printf("UA_TL_TCP_reader - remove handle=%d\n",((UA_TL_connection*)lec->payload)->connectionHandle);
  76. UA_list_removeElement(lec,UA_NULL);
  77. UA_free(c);
  78. }
  79. return UA_NULL;
  80. }
  81. /** write to a tcp transport layer connection */
  82. UA_Int32 UA_TL_TCP_writer(struct T_TL_connection* c, UA_ByteString* msg) {
  83. UA_ByteString_printx("write data:", msg);
  84. int nWritten = 0;
  85. while (nWritten < msg->length) {
  86. int n=0;
  87. do {
  88. printf("UA_TL_TCP_write - enter write\n");
  89. n = write(c->connectionHandle, &(msg->data[nWritten]), msg->length-nWritten);
  90. printf("UA_TL_TCP_write - leave write with n=%d,errno=%d\n",n,errno);
  91. } while (n == -1L && errno == EINTR);
  92. if (n >= 0) {
  93. nWritten += n;
  94. } else {
  95. // TODO: error handling
  96. }
  97. }
  98. return UA_SUCCESS;
  99. }
  100. /** the tcp listener routine.
  101. * does a single shot if single threaded, runs forever if multithreaded
  102. */
  103. void* UA_TL_TCP_listen(void *p) {
  104. UA_TL_data* tld = (UA_TL_data*) p;
  105. UA_String_printf("open62541-server at ",&(tld->endpointUrl));
  106. do {
  107. printf("UA_TL_TCP_listen - enter listen\n");
  108. int retval = listen(tld->listenerHandle, tld->tld->maxConnections);
  109. printf("UA_TL_TCP_listen - leave listen, retval=%d\n",retval);
  110. if (retval < 0) {
  111. // TODO: Error handling
  112. printf("UA_TL_TCP_listen retval=%d, errno=%d\n",retval,errno);
  113. } else if (tld->tld->maxConnections == -1 || tld->connections.size < tld->tld->maxConnections) {
  114. // accept only if not max number of connections exceede
  115. struct sockaddr_in cli_addr;
  116. socklen_t cli_len = sizeof(cli_addr);
  117. printf("UA_TL_TCP_listen - enter accept\n");
  118. int newsockfd = accept(tld->listenerHandle, (struct sockaddr *) &cli_addr, &cli_len);
  119. printf("UA_TL_TCP_listen - leave accept\n");
  120. if (newsockfd < 0) {
  121. printf("UA_TL_TCP_listen - accept returns errno=%d\n",errno);
  122. perror("ERROR on accept");
  123. } else {
  124. printf("UA_TL_TCP_listen - new connection on %d\n",newsockfd);
  125. UA_TL_connection* c;
  126. UA_Int32 retval = UA_SUCCESS;
  127. retval |= UA_alloc((void**)&c,sizeof(UA_TL_connection));
  128. TL_Connection_init(c, tld);
  129. c->connectionHandle = newsockfd;
  130. c->writerCallback = UA_TL_TCP_writer;
  131. c->readerCallback = UA_TL_TCP_reader;
  132. // add to list
  133. UA_list_addPayloadToBack(&(tld->connections),c);
  134. if (tld->threaded == UA_STACK_MULTITHREADED) {
  135. // TODO: handle retval of pthread_create
  136. pthread_create( &(c->readerThread), NULL, (void*(*)(void*)) UA_TL_TCP_reader, (void*) c);
  137. } else {
  138. UA_TL_TCP_SetNonBlocking(c->connectionHandle);
  139. }
  140. }
  141. } else {
  142. // no action necessary to reject connection
  143. }
  144. } while (tld->threaded == UA_STACK_MULTITHREADED);
  145. return UA_NULL;
  146. }
  147. void checkFdSet(void* payload) {
  148. UA_TL_connection* c = (UA_TL_connection*) payload;
  149. if (FD_ISSET(c->connectionHandle, &(theTL.readerHandles))) {
  150. c->readerCallback((void*)c);
  151. }
  152. }
  153. int maxHandle;
  154. void UA_TL_addHandleToSet(UA_Int32 handle) {
  155. FD_SET(handle, &(theTL.readerHandles));
  156. maxHandle = (handle > maxHandle) ? handle : maxHandle;
  157. }
  158. void setFdSet(void* payload) {
  159. UA_TL_connection* c = (UA_TL_connection*) payload;
  160. UA_TL_addHandleToSet(c->connectionHandle);
  161. }
  162. UA_Int32 UA_Stack_addReaderHandle(UA_Int32 handle, UA_TL_reader reader) {
  163. UA_Int32 retval = UA_SUCCESS;
  164. UA_TL_connection* c;
  165. retval = UA_alloc((void**)&c,sizeof(UA_TL_connection));
  166. c->connectionHandle = handle;
  167. c->connectionState = connectionState_ESTABLISHED;
  168. c->readerCallback = reader;
  169. return retval;
  170. }
  171. UA_Int32 UA_Stack_msgLoop(struct timeval *tv, UA_Int32(*worker)(void*), void *arg) {
  172. UA_Int32 result;
  173. while (UA_TRUE) {
  174. // determine the largest handle
  175. maxHandle = 0;
  176. UA_list_iteratePayload(&theTL.connections,setFdSet);
  177. UA_TL_addHandleToSet(theTL.listenerHandle);
  178. printf("UA_Stack_msgLoop - maxHandle=%d\n", maxHandle);
  179. // copy tv, some unixes do overwrite and return the remaining time
  180. struct timeval tmptv;
  181. memcpy(&tmptv,tv,sizeof(struct timeval));
  182. // and wait
  183. printf("UA_Stack_msgLoop - enter select sec=%d,usec=%d\n",(UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec);
  184. result = select(maxHandle + 1, &(theTL.readerHandles), UA_NULL, UA_NULL,&tmptv);
  185. printf("UA_Stack_msgLoop - leave select result=%d,sec=%d,usec=%d\n",result, (UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec);
  186. if (result == 0) {
  187. int err = errno;
  188. switch (err) {
  189. case EBADF:
  190. printf("UA_Stack_msgLoop - result=bad file\n"); //FIXME: handle
  191. break;
  192. case EINTR:
  193. printf("UA_Stack_msgLoop - result=interupted\n"); //FIXME: handle
  194. break;
  195. case EINVAL:
  196. printf("UA_Stack_msgLoop - result=bad arguments\n"); //FIXME: handle
  197. break;
  198. case EAGAIN:
  199. printf("UA_Stack_msgLoop - result=do it again\n");
  200. default:
  201. printf("UA_Stack_msgLoop - result=%d\n",err);
  202. worker(arg);
  203. }
  204. } else if (FD_ISSET(theTL.listenerHandle,&theTL.readerHandles)) { // activity on listener port
  205. printf("UA_Stack_msgLoop - connection request\n");
  206. UA_TL_TCP_listen((void*)&theTL);
  207. } else { // activity on client ports
  208. printf("UA_Stack_msgLoop - activities on %d handles\n",result);
  209. UA_list_iteratePayload(&theTL.connections,checkFdSet);
  210. }
  211. }
  212. return UA_SUCCESS;
  213. }
  214. UA_Int32 UA_TL_TCP_init(UA_TL_data* tld, UA_Int32 port) {
  215. UA_Int32 retval = UA_SUCCESS;
  216. // socket variables
  217. int optval = 1;
  218. struct sockaddr_in serv_addr;
  219. // create socket for listening to incoming connections
  220. tld->listenerHandle = socket(PF_INET, SOCK_STREAM, 0);
  221. if (tld->listenerHandle < 0) {
  222. perror("ERROR opening socket");
  223. retval = UA_ERROR;
  224. } else {
  225. // set port number, options and bind
  226. memset((void *) &serv_addr, sizeof(serv_addr),1);
  227. serv_addr.sin_family = AF_INET;
  228. serv_addr.sin_addr.s_addr = INADDR_ANY;
  229. serv_addr.sin_port = htons(port);
  230. if (setsockopt(tld->listenerHandle, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) == -1 ) {
  231. perror("setsockopt");
  232. retval = UA_ERROR;
  233. } else {
  234. // bind to port
  235. if (bind(tld->listenerHandle, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
  236. perror("ERROR on binding");
  237. retval = UA_ERROR;
  238. } else {
  239. // TODO: implement
  240. // UA_String_sprintf("opc.tpc://localhost:%d", &(tld->endpointUrl), port);
  241. UA_String_copycstring("opc.tpc://localhost:16664/", &(tld->endpointUrl));
  242. }
  243. }
  244. }
  245. // finally
  246. if (retval == UA_SUCCESS) {
  247. if (tld->threaded == UA_STACK_MULTITHREADED) {
  248. // TODO: handle retval of pthread_create
  249. pthread_create( &tld->listenerThreadHandle, NULL, UA_TL_TCP_listen, (void*) tld);
  250. } else {
  251. UA_TL_TCP_SetNonBlocking(tld->listenerHandle);
  252. FD_ZERO(&(tld->readerHandles));
  253. }
  254. }
  255. return retval;
  256. }
  257. /** checks arguments and dispatches to worker or refuses to init */
  258. UA_Int32 UA_TL_init(UA_TL_Description* tlDesc, UA_Int32 port, UA_Int32 threaded) {
  259. UA_Int32 retval = UA_SUCCESS;
  260. if (tlDesc->connectionType == UA_TL_CONNECTIONTYPE_TCPV4 && tlDesc->encoding == UA_TL_ENCODING_BINARY) {
  261. theTL.tld = tlDesc;
  262. theTL.threaded = threaded;
  263. UA_list_init(&theTL.connections);
  264. retval |= UA_TL_TCP_init(&theTL,port);
  265. } else {
  266. retval = UA_ERR_NOT_IMPLEMENTED;
  267. }
  268. return retval;
  269. }
  270. /** checks arguments and dispatches to worker or refuses to init */
  271. UA_Int32 UA_Stack_init(UA_TL_Description* tlDesc, UA_Int32 port, UA_Int32 threaded) {
  272. UA_Int32 retval = UA_SUCCESS;
  273. retval = UA_TL_init(tlDesc,port,threaded);
  274. return retval;
  275. }