|
@@ -10,7 +10,6 @@
|
|
|
#include <memory.h> // memset
|
|
|
#include <fcntl.h> // fcntl
|
|
|
|
|
|
-#define VERBOSE 1
|
|
|
#include "networklayer.h"
|
|
|
|
|
|
NL_Description NL_Description_TcpBinary = {
|
|
@@ -20,12 +19,17 @@ NL_Description NL_Description_TcpBinary = {
|
|
|
{-1,8192,8192,16384,1}
|
|
|
};
|
|
|
|
|
|
+/* If we do not have multitasking, we implement a dispatcher-Pattern. All Connections
|
|
|
+ * are collected in a list. From this list a fd_set is prepared and select then waits
|
|
|
+ * for activities. We then iterate over the list, check if we've got some activites
|
|
|
+ * and call the corresponding callback (reader, listener).
|
|
|
+ */
|
|
|
+#ifndef MULTITASKING
|
|
|
_Bool NL_ConnectionComparer(void *p1, void* p2) {
|
|
|
NL_Connection* c1 = (NL_Connection*) p1;
|
|
|
NL_Connection* c2 = (NL_Connection*) p2;
|
|
|
return (c1->connection.connectionHandle == c2->connection.connectionHandle);
|
|
|
}
|
|
|
-
|
|
|
int NL_TCP_SetNonBlocking(int sock) {
|
|
|
int opts = fcntl(sock,F_GETFL);
|
|
|
if (opts < 0) {
|
|
@@ -44,31 +48,84 @@ void NL_Connection_printf(void* payload) {
|
|
|
NL_Connection* c = (NL_Connection*) payload;
|
|
|
printf("ListElement connectionHandle = %d\n",c->connection.connectionHandle);
|
|
|
}
|
|
|
+void NL_addHandleToSet(UA_Int32 handle, NL_data* nl) {
|
|
|
+ FD_SET(handle, &(nl->readerHandles));
|
|
|
+ nl->maxReaderHandle = (handle > nl->maxReaderHandle) ? handle : nl->maxReaderHandle;
|
|
|
+}
|
|
|
+void NL_setFdSet(void* payload) {
|
|
|
+ NL_Connection* c = (NL_Connection*) payload;
|
|
|
+ NL_addHandleToSet(c->connection.connectionHandle, c->networkLayer);
|
|
|
+}
|
|
|
+void NL_checkFdSet(void* payload) {
|
|
|
+ NL_Connection* c = (NL_Connection*) payload;
|
|
|
+ if (FD_ISSET(c->connection.connectionHandle, &(c->networkLayer->readerHandles))) {
|
|
|
+ c->reader((void*)c);
|
|
|
+ }
|
|
|
+}
|
|
|
+UA_Int32 NL_msgLoop(NL_data* nl, struct timeval *tv, UA_Int32(*worker)(void*), void *arg) {
|
|
|
+ UA_Int32 result;
|
|
|
+ while (UA_TRUE) {
|
|
|
+ // determine the largest handle
|
|
|
+ nl->maxReaderHandle = 0;
|
|
|
+ UA_list_iteratePayload(&(nl->connections),NL_setFdSet);
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - maxHandle=%d\n", nl->maxReaderHandle));
|
|
|
|
|
|
-/** the tcp reader thread - single shot if single-threaded, looping until CLOSE if multi-threaded
|
|
|
- */
|
|
|
+ // copy tv, some unixes do overwrite and return the remaining time
|
|
|
+ struct timeval tmptv;
|
|
|
+ memcpy(&tmptv,tv,sizeof(struct timeval));
|
|
|
+
|
|
|
+ // and wait
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - enter select sec=%d,usec=%d\n",(UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec));
|
|
|
+ result = select(nl->maxReaderHandle + 1, &(nl->readerHandles), UA_NULL, UA_NULL,&tmptv);
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - leave select result=%d,sec=%d,usec=%d\n",result, (UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec));
|
|
|
+ if (result == 0) {
|
|
|
+ int err = errno;
|
|
|
+ switch (err) {
|
|
|
+ case EBADF:
|
|
|
+ case EINTR:
|
|
|
+ case EINVAL:
|
|
|
+ //FIXME: handle errors
|
|
|
+ DBG_ERR(printf("UA_Stack_msgLoop - errno={%d,%s}\n", errno, strerror(errno)));
|
|
|
+ break;
|
|
|
+ case EAGAIN:
|
|
|
+ default:
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - errno={%d,%s}\n", errno, strerror(errno)));
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - call worker\n"));
|
|
|
+ worker(arg);
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - return from worker\n"));
|
|
|
+ }
|
|
|
+ } else { // activity on listener or client ports
|
|
|
+ DBG_VERBOSE(printf("UA_Stack_msgLoop - activities on %d handles\n",result));
|
|
|
+ UA_list_iteratePayload(&(nl->connections),NL_checkFdSet);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return UA_SUCCESS;
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
+
|
|
|
+/** the tcp reader function */
|
|
|
void* NL_TCP_reader(NL_Connection *c) {
|
|
|
|
|
|
UA_ByteString readBuffer;
|
|
|
UA_alloc((void**)&(readBuffer.data),c->connection.localConf.recvBufferSize);
|
|
|
|
|
|
if (c->connection.connectionState != CONNECTIONSTATE_CLOSE) {
|
|
|
- do {
|
|
|
- DBG_VERBOSE(printf("NL_TCP_reader - enter read\n"));
|
|
|
- readBuffer.length = read(c->connection.connectionHandle, readBuffer.data, c->connection.localConf.recvBufferSize);
|
|
|
- DBG_VERBOSE(printf("NL_TCP_reader - leave read\n"));
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_reader - enter read\n"));
|
|
|
+ readBuffer.length = read(c->connection.connectionHandle, readBuffer.data, c->connection.localConf.recvBufferSize);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_reader - leave read\n"));
|
|
|
|
|
|
- DBG_VERBOSE(printf("NL_TCP_reader - src={%*.s}, ",c->connection.remoteEndpointUrl.length,c->connection.remoteEndpointUrl.data));
|
|
|
- UA_ByteString_printx("received=",&readBuffer);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_reader - src={%*.s}, ",c->connection.remoteEndpointUrl.length,c->connection.remoteEndpointUrl.data));
|
|
|
+ UA_ByteString_printx("received=",&readBuffer);
|
|
|
|
|
|
- if (readBuffer.length > 0) {
|
|
|
- TL_Process(&(c->connection),&readBuffer);
|
|
|
- } else {
|
|
|
- c->connection.connectionState = CONNECTIONSTATE_CLOSE;
|
|
|
- perror("ERROR reading from socket1");
|
|
|
- }
|
|
|
- } while (c->connection.connectionState != CONNECTIONSTATE_CLOSE);
|
|
|
+ if (readBuffer.length > 0) {
|
|
|
+ TL_Process(&(c->connection),&readBuffer);
|
|
|
+ } else {
|
|
|
+ c->connection.connectionState = CONNECTIONSTATE_CLOSE;
|
|
|
+ perror("ERROR reading from socket1");
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
if (c->connection.connectionState == CONNECTIONSTATE_CLOSE) {
|
|
|
DBG_VERBOSE(printf("NL_TCP_reader - enter shutdown\n"));
|
|
|
shutdown(c->connection.connectionHandle,2);
|
|
@@ -78,16 +135,33 @@ void* NL_TCP_reader(NL_Connection *c) {
|
|
|
c->connection.connectionState = CONNECTIONSTATE_CLOSED;
|
|
|
|
|
|
UA_ByteString_deleteMembers(&readBuffer);
|
|
|
+
|
|
|
+#ifndef MULTITHREADING
|
|
|
DBG_VERBOSE(printf("NL_TCP_reader - search element to remove\n"));
|
|
|
UA_list_Element* lec = UA_list_search(&(c->networkLayer->connections),NL_ConnectionComparer,c);
|
|
|
DBG_VERBOSE(printf("NL_TCP_reader - remove connection for handle=%d\n",((NL_Connection*)lec->payload)->connection.connectionHandle));
|
|
|
UA_list_removeElement(lec,UA_NULL);
|
|
|
DBG_VERBOSE(UA_list_iteratePayload(&(c->networkLayer->connections),NL_Connection_printf));
|
|
|
UA_free(c);
|
|
|
+#endif
|
|
|
}
|
|
|
return UA_NULL;
|
|
|
}
|
|
|
|
|
|
+#ifdef MULTITHREADING
|
|
|
+/** the tcp reader thread */
|
|
|
+void* NL_TCP_readerThread(NL_Connection *c) {
|
|
|
+ // just loop, NL_TCP_Reader will call the stack
|
|
|
+ do {
|
|
|
+ NL_TCP_reader(c);
|
|
|
+ } while (c->connection.connectionState != CONNECTIONSTATE_CLOSED);
|
|
|
+ // clean up
|
|
|
+ UA_free(c);
|
|
|
+ c->readerThreadHandle = -1;
|
|
|
+ pthread_exit(UA_NULL);
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
/** write to a tcp transport layer connection */
|
|
|
UA_Int32 NL_TCP_writer(TL_Connection* c, UA_ByteString** gather_buf, UA_UInt32 gather_len) {
|
|
|
|
|
@@ -101,19 +175,20 @@ UA_Int32 NL_TCP_writer(TL_Connection* c, UA_ByteString** gather_buf, UA_UInt32 g
|
|
|
}
|
|
|
|
|
|
struct msghdr message;
|
|
|
- message.msg_name = 0;
|
|
|
+ message.msg_name = UA_NULL;
|
|
|
message.msg_namelen = 0;
|
|
|
message.msg_iov = iov;
|
|
|
message.msg_iovlen = gather_len;
|
|
|
- message.msg_control = 0;
|
|
|
+ message.msg_control = UA_NULL;
|
|
|
message.msg_controllen = 0;
|
|
|
+ message.msg_flags = 0;
|
|
|
|
|
|
UA_UInt32 nWritten = 0;
|
|
|
while (nWritten < total_len) {
|
|
|
int n=0;
|
|
|
do {
|
|
|
DBG_VERBOSE(printf("NL_TCP_writer - enter write\n"));
|
|
|
- n = write(c->connectionHandle, &message, 0);
|
|
|
+ n = sendmsg(c->connectionHandle, &message, 0);
|
|
|
DBG_VERBOSE(printf("NL_TCP_writer - leave write with n=%d,errno={%d,%s}\n",n,errno,strerror(errno)));
|
|
|
} while (n == -1L && errno == EINTR);
|
|
|
if (n >= 0) {
|
|
@@ -146,107 +221,57 @@ void* NL_Connection_init(NL_Connection* c, NL_data* tld, UA_Int32 connectionHand
|
|
|
return UA_NULL;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-/** the tcp listener routine.
|
|
|
- * does a single shot if single threaded, runs forever if multithreaded
|
|
|
- */
|
|
|
+/** the tcp listener routine */
|
|
|
void* NL_TCP_listen(NL_Connection* c) {
|
|
|
NL_data* tld = c->networkLayer;
|
|
|
|
|
|
- do {
|
|
|
- DBG_VERBOSE(printf("NL_TCP_listen - enter listen\n"));
|
|
|
- int retval = listen(c->connection.connectionHandle, tld->tld->maxConnections);
|
|
|
- DBG_VERBOSE(printf("NL_TCP_listen - leave listen, retval=%d\n",retval));
|
|
|
-
|
|
|
- if (retval < 0) {
|
|
|
- // TODO: Error handling
|
|
|
- perror("NL_TCP_listen");
|
|
|
- DBG_ERR(printf("NL_TCP_listen retval=%d, errno={%d,%s}\n",retval,errno,strerror(errno)));
|
|
|
- } else if (tld->tld->maxConnections == -1 || tld->connections.size < tld->tld->maxConnections) {
|
|
|
- // accept only if not max number of connections exceeded
|
|
|
- struct sockaddr_in cli_addr;
|
|
|
- socklen_t cli_len = sizeof(cli_addr);
|
|
|
- DBG_VERBOSE(printf("NL_TCP_listen - enter accept\n"));
|
|
|
- int newsockfd = accept(c->connection.connectionHandle, (struct sockaddr *) &cli_addr, &cli_len);
|
|
|
- DBG_VERBOSE(printf("NL_TCP_listen - leave accept\n"));
|
|
|
- if (newsockfd < 0) {
|
|
|
- DBG_ERR(printf("TL_TCP_listen - accept returns errno={%d,%s}\n",errno,strerror(errno)));
|
|
|
- perror("ERROR on accept");
|
|
|
- } else {
|
|
|
- DBG_VERBOSE(printf("NL_TCP_listen - new connection on %d\n",newsockfd));
|
|
|
- NL_Connection* cclient;
|
|
|
- UA_Int32 retval = UA_SUCCESS;
|
|
|
- retval |= UA_alloc((void**)&cclient,sizeof(NL_Connection));
|
|
|
- NL_Connection_init(cclient, tld, newsockfd, NL_TCP_reader, (TL_Writer) NL_TCP_writer);
|
|
|
- UA_list_addPayloadToBack(&(tld->connections),cclient);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_listen - enter listen\n"));
|
|
|
+ int retval = listen(c->connection.connectionHandle, tld->tld->maxConnections);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_listen - leave listen, retval=%d\n",retval));
|
|
|
+
|
|
|
+ if (retval < 0) {
|
|
|
+ // TODO: Error handling
|
|
|
+ perror("NL_TCP_listen");
|
|
|
+ DBG_ERR(printf("NL_TCP_listen retval=%d, errno={%d,%s}\n",retval,errno,strerror(errno)));
|
|
|
+ } else if (tld->tld->maxConnections == -1 || tld->connections.size < tld->tld->maxConnections) {
|
|
|
+ // accept only if not max number of connections exceeded
|
|
|
+ struct sockaddr_in cli_addr;
|
|
|
+ socklen_t cli_len = sizeof(cli_addr);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_listen - enter accept\n"));
|
|
|
+ int newsockfd = accept(c->connection.connectionHandle, (struct sockaddr *) &cli_addr, &cli_len);
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_listen - leave accept\n"));
|
|
|
+ if (newsockfd < 0) {
|
|
|
+ DBG_ERR(printf("TL_TCP_listen - accept returns errno={%d,%s}\n",errno,strerror(errno)));
|
|
|
+ perror("ERROR on accept");
|
|
|
+ } else {
|
|
|
+ DBG_VERBOSE(printf("NL_TCP_listen - new connection on %d\n",newsockfd));
|
|
|
+ NL_Connection* cclient;
|
|
|
+ UA_Int32 retval = UA_SUCCESS;
|
|
|
+ retval |= UA_alloc((void**)&cclient,sizeof(NL_Connection));
|
|
|
+ NL_Connection_init(cclient, tld, newsockfd, NL_TCP_reader, (TL_Writer) NL_TCP_writer);
|
|
|
#ifdef MULTITHREADING
|
|
|
- pthread_create( &(cclient->readerThreadHandle), NULL, (void*(*)(void*)) NL_TCP_reader, (void*) cclient);
|
|
|
+ pthread_create( &(cclient->readerThreadHandle), NULL, (void*(*)(void*)) NL_TCP_readerThread, (void*) cclient);
|
|
|
#else
|
|
|
- NL_TCP_SetNonBlocking(cclient->connection.connectionHandle);
|
|
|
+ UA_list_addPayloadToBack(&(tld->connections),cclient);
|
|
|
+ NL_TCP_SetNonBlocking(cclient->connection.connectionHandle);
|
|
|
#endif
|
|
|
- }
|
|
|
- } else {
|
|
|
- // no action necessary to reject connection
|
|
|
}
|
|
|
- } while (1);
|
|
|
+ } else {
|
|
|
+ // no action necessary to reject connection
|
|
|
+ }
|
|
|
return UA_NULL;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-void NL_addHandleToSet(UA_Int32 handle, NL_data* nl) {
|
|
|
- FD_SET(handle, &(nl->readerHandles));
|
|
|
- nl->maxReaderHandle = (handle > nl->maxReaderHandle) ? handle : nl->maxReaderHandle;
|
|
|
-}
|
|
|
-void NL_setFdSet(void* payload) {
|
|
|
- NL_Connection* c = (NL_Connection*) payload;
|
|
|
- NL_addHandleToSet(c->connection.connectionHandle, c->networkLayer);
|
|
|
-}
|
|
|
-void NL_checkFdSet(void* payload) {
|
|
|
- NL_Connection* c = (NL_Connection*) payload;
|
|
|
- if (FD_ISSET(c->connection.connectionHandle, &(c->networkLayer->readerHandles))) {
|
|
|
- c->reader((void*)c);
|
|
|
- }
|
|
|
+#ifdef MULTITHREADING
|
|
|
+void* NL_TCP_listenThread(NL_Connection* c) {
|
|
|
+ do {
|
|
|
+ NL_TCP_listen(c);
|
|
|
+ } while (UA_TRUE);
|
|
|
+ UA_free(c);
|
|
|
+ pthread_exit(UA_NULL);
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
-UA_Int32 NL_msgLoop(NL_data* nl, struct timeval *tv, UA_Int32(*worker)(void*), void *arg) {
|
|
|
- UA_Int32 result;
|
|
|
- while (UA_TRUE) {
|
|
|
- // determine the largest handle
|
|
|
- nl->maxReaderHandle = 0;
|
|
|
- UA_list_iteratePayload(&(nl->connections),NL_setFdSet);
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - maxHandle=%d\n", nl->maxReaderHandle));
|
|
|
-
|
|
|
- // copy tv, some unixes do overwrite and return the remaining time
|
|
|
- struct timeval tmptv;
|
|
|
- memcpy(&tmptv,tv,sizeof(struct timeval));
|
|
|
-
|
|
|
- // and wait
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - enter select sec=%d,usec=%d\n",(UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec));
|
|
|
- result = select(nl->maxReaderHandle + 1, &(nl->readerHandles), UA_NULL, UA_NULL,&tmptv);
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - leave select result=%d,sec=%d,usec=%d\n",result, (UA_Int32) tmptv.tv_sec, (UA_Int32) tmptv.tv_usec));
|
|
|
- if (result == 0) {
|
|
|
- int err = errno;
|
|
|
- switch (err) {
|
|
|
- case EBADF:
|
|
|
- case EINTR:
|
|
|
- case EINVAL:
|
|
|
- //FIXME: handle errors
|
|
|
- DBG_ERR(printf("UA_Stack_msgLoop - errno={%d,%s}\n", errno, strerror(errno)));
|
|
|
- break;
|
|
|
- case EAGAIN:
|
|
|
- default:
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - errno={%d,%s}\n", errno, strerror(errno)));
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - call worker\n"));
|
|
|
- worker(arg);
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - return from worker\n"));
|
|
|
- }
|
|
|
- } else { // activity on listener or client ports
|
|
|
- DBG_VERBOSE(printf("UA_Stack_msgLoop - activities on %d handles\n",result));
|
|
|
- UA_list_iteratePayload(&(nl->connections),NL_checkFdSet);
|
|
|
- }
|
|
|
- }
|
|
|
- return UA_SUCCESS;
|
|
|
-}
|
|
|
|
|
|
UA_Int32 NL_TCP_init(NL_data* tld, UA_Int32 port) {
|
|
|
UA_Int32 retval = UA_SUCCESS;
|
|
@@ -287,11 +312,11 @@ UA_Int32 NL_TCP_init(NL_data* tld, UA_Int32 port) {
|
|
|
UA_Int32 retval = UA_SUCCESS;
|
|
|
retval |= UA_alloc((void**)&c,sizeof(NL_Connection));
|
|
|
NL_Connection_init(c, tld, newsockfd, NL_TCP_listen, (TL_Writer) NL_TCP_writer);
|
|
|
- UA_list_addPayloadToBack(&(tld->connections),c);
|
|
|
#ifdef MULTITHREADING
|
|
|
- pthread_create( &(c->readerThreadHandle), NULL, (void*(*)(void*)) NL_TCP_listen, (void*) c);
|
|
|
+ pthread_create( &(c->readerThreadHandle), NULL, (void*(*)(void*)) NL_TCP_listenThread, (void*) c);
|
|
|
#else
|
|
|
- NL_TCP_SetNonBlocking(c->connection.connectionHandle);
|
|
|
+ UA_list_addPayloadToBack(&(tld->connections),c);
|
|
|
+ NL_TCP_SetNonBlocking(c->connection.connectionHandle);
|
|
|
#endif
|
|
|
}
|
|
|
return retval;
|