mt_testing.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #ifndef OPEN62541_MT_TESTING_H
  2. #define OPEN62541_MT_TESTING_H
  3. #include <open62541/server_config_default.h>
  4. #define THREAD_CALLBACK_PARAM(name, param) static void * name(void *param)
  5. #define THREAD_CREATE_PARAM(handle, callback, param) pthread_create(&(handle), NULL, callback, (void*) &(param))
  6. typedef struct {
  7. void (*func)(void *param); //function to execute
  8. size_t counter; //index of the iteration
  9. size_t index; // index within workerContext array of global TestContext
  10. size_t upperBound; //number of iterations each thread schould execute func
  11. THREAD_HANDLE handle;
  12. } ThreadContext;
  13. typedef struct {
  14. size_t numberOfWorkers;
  15. ThreadContext *workerContext;
  16. size_t numberofClients;
  17. ThreadContext *clientContext;
  18. UA_Boolean running;
  19. UA_Server *server;
  20. UA_Client **clients;
  21. void (*checkServerNodes)(void);
  22. } TestContext;
  23. TestContext tc;
  24. THREAD_HANDLE server_thread;
  25. THREAD_CALLBACK(serverloop) {
  26. while(tc.running)
  27. UA_Server_run_iterate(tc.server, true);
  28. return 0;
  29. }
  30. static
  31. void teardown(void) {
  32. for (size_t i = 0; i < tc.numberOfWorkers; i++)
  33. THREAD_JOIN(tc.workerContext[i].handle);
  34. for (size_t i = 0; i < tc.numberofClients; i++)
  35. THREAD_JOIN(tc.clientContext[i].handle);
  36. tc.running = false;
  37. THREAD_JOIN(server_thread);
  38. if (tc.checkServerNodes)
  39. tc.checkServerNodes();
  40. UA_Server_run_shutdown(tc.server);
  41. UA_Server_delete(tc.server);
  42. }
  43. THREAD_CALLBACK_PARAM(workerLoop, val) {
  44. ThreadContext tmp = (*(ThreadContext *) val);
  45. for (size_t i = 0; i < tmp.upperBound; i++) {
  46. tmp.counter = i;
  47. tmp.func(&tmp);
  48. }
  49. return NULL;
  50. }
  51. THREAD_CALLBACK_PARAM(clientLoop, val) {
  52. ThreadContext tmp = (*(ThreadContext *) val);
  53. tc.clients[tmp.index] = UA_Client_new();
  54. UA_ClientConfig_setDefault(UA_Client_getConfig(tc.clients[tmp.index]));
  55. UA_StatusCode retval = UA_Client_connect(tc.clients[tmp.index], "opc.tcp://localhost:4840");
  56. ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
  57. for(size_t i = 0; i < tmp.upperBound; i++) {
  58. tmp.counter = i;
  59. tmp.func(&tmp);
  60. }
  61. UA_Client_disconnect(tc.clients[tmp.index]);
  62. UA_Client_delete(tc.clients[tmp.index]);
  63. return NULL;
  64. }
  65. static UA_INLINE void
  66. initThreadContext(size_t numberOfWorkers, size_t numberOfClients, void (*checkServerNodes)(void)) {
  67. tc.numberOfWorkers = numberOfWorkers;
  68. tc.numberofClients = numberOfClients;
  69. tc.checkServerNodes = checkServerNodes;
  70. tc.workerContext = (ThreadContext*) UA_calloc(tc.numberOfWorkers, sizeof(ThreadContext));
  71. tc.clients = (UA_Client**) UA_calloc(tc.numberofClients, sizeof(UA_Client*));
  72. tc.clientContext = (ThreadContext*) UA_calloc(tc.numberofClients, sizeof(ThreadContext));
  73. }
  74. static UA_INLINE void
  75. setThreadContext(ThreadContext *workerContext, size_t index, size_t upperBound, void (*func)(void *param)) {
  76. workerContext->index = index;
  77. workerContext->upperBound = upperBound;
  78. workerContext->func = func;
  79. }
  80. static UA_INLINE void
  81. startMultithreading(void) {
  82. for (size_t i = 0; i < tc.numberOfWorkers; i++)
  83. THREAD_CREATE_PARAM(tc.workerContext[i].handle, workerLoop, tc.workerContext[i]);
  84. for (size_t i = 0; i < tc.numberofClients; i++)
  85. THREAD_CREATE_PARAM(tc.clientContext[i].handle, clientLoop, tc.clientContext[i]);
  86. }
  87. #endif //OPEN62541_MT_TESTING_H