pubsub_interrupt_publish.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  2. * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  3. *
  4. * Copyright 2018-2019 (c) Kalycito Infotech
  5. * Copyright 2019 (c) Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright 2019 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include <signal.h>
  9. #include <stdio.h>
  10. #include <time.h>
  11. #include <open62541/server.h>
  12. #include <open62541/server_config_default.h>
  13. #include <open62541/plugin/log_stdout.h>
  14. #include <open62541/plugin/pubsub_ethernet.h>
  15. #include "bufmalloc.h"
  16. #define ETH_PUBLISH_ADDRESS "opc.eth://0a-00-27-00-00-08"
  17. #define ETH_INTERFACE "enp0s8"
  18. #define MAX_MEASUREMENTS 10000
  19. #define MILLI_AS_NANO_SECONDS (1000 * 1000)
  20. #define SECONDS_AS_NANO_SECONDS (1000 * 1000 * 1000)
  21. #define CLOCKID CLOCK_MONOTONIC_RAW
  22. #define SIG SIGUSR1
  23. #define PUB_INTERVAL 0.25 /* Publish interval in milliseconds */
  24. #define DATA_SET_WRITER_ID 62541
  25. #define MEASUREMENT_OUTPUT "publisher_measurement.csv"
  26. UA_NodeId counterNodePublisher = {1, UA_NODEIDTYPE_NUMERIC, {1234}};
  27. UA_Int64 pubIntervalNs;
  28. UA_ServerCallback pubCallback = NULL;
  29. UA_Server *pubServer;
  30. UA_Boolean running = true;
  31. void *pubData;
  32. timer_t pubEventTimer;
  33. struct sigevent pubEvent;
  34. struct sigaction signalAction;
  35. /* Arrays to store measurement data */
  36. UA_Int32 currentPublishCycleTime[MAX_MEASUREMENTS+1];
  37. struct timespec calculatedCycleStartTime[MAX_MEASUREMENTS+1];
  38. struct timespec cycleStartDelay[MAX_MEASUREMENTS+1];
  39. struct timespec cycleDuration[MAX_MEASUREMENTS+1];
  40. size_t publisherMeasurementsCounter = 0;
  41. static void
  42. timespec_diff(struct timespec *start, struct timespec *stop,
  43. struct timespec *result) {
  44. if((stop->tv_nsec - start->tv_nsec) < 0) {
  45. result->tv_sec = stop->tv_sec - start->tv_sec - 1;
  46. result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000;
  47. } else {
  48. result->tv_sec = stop->tv_sec - start->tv_sec;
  49. result->tv_nsec = stop->tv_nsec - start->tv_nsec;
  50. }
  51. }
  52. /* Used to adjust the nanosecond > 1s field value */
  53. static void
  54. nanoSecondFieldConversion(struct timespec *timeSpecValue) {
  55. while(timeSpecValue->tv_nsec > (SECONDS_AS_NANO_SECONDS - 1)) {
  56. timeSpecValue->tv_sec += 1;
  57. timeSpecValue->tv_nsec -= SECONDS_AS_NANO_SECONDS;
  58. }
  59. }
  60. /* Signal handler */
  61. static void
  62. publishInterrupt(int sig, siginfo_t* si, void* uc) {
  63. if(si->si_value.sival_ptr != &pubEventTimer) {
  64. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "stray signal");
  65. return;
  66. }
  67. /* Execute the publish callback in the interrupt */
  68. struct timespec begin, end;
  69. clock_gettime(CLOCKID, &begin);
  70. useMembufAlloc();
  71. pubCallback(pubServer, pubData);
  72. useNormalAlloc();
  73. clock_gettime(CLOCKID, &end);
  74. if(publisherMeasurementsCounter >= MAX_MEASUREMENTS)
  75. return;
  76. /* Save current configured publish interval */
  77. currentPublishCycleTime[publisherMeasurementsCounter] = pubIntervalNs;
  78. /* Save the difference to the calculated time */
  79. timespec_diff(&calculatedCycleStartTime[publisherMeasurementsCounter],
  80. &begin, &cycleStartDelay[publisherMeasurementsCounter]);
  81. /* Save the duration of the publish callback */
  82. timespec_diff(&begin, &end, &cycleDuration[publisherMeasurementsCounter]);
  83. publisherMeasurementsCounter++;
  84. /* Save the calculated starting time for the next cycle */
  85. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec =
  86. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_nsec + pubIntervalNs;
  87. calculatedCycleStartTime[publisherMeasurementsCounter].tv_sec =
  88. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_sec;
  89. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  90. /* Write the pubsub measurement data */
  91. if(publisherMeasurementsCounter == MAX_MEASUREMENTS) {
  92. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  93. "Logging the measurements to %s", MEASUREMENT_OUTPUT);
  94. FILE *fpPublisher = fopen(MEASUREMENT_OUTPUT, "w");
  95. for(UA_UInt32 i = 0; i < publisherMeasurementsCounter; i++) {
  96. fprintf(fpPublisher, "%u, %u, %ld.%09ld, %ld.%09ld, %ld.%09ld\n",
  97. i,
  98. currentPublishCycleTime[i],
  99. calculatedCycleStartTime[i].tv_sec,
  100. calculatedCycleStartTime[i].tv_nsec,
  101. cycleStartDelay[i].tv_sec,
  102. cycleStartDelay[i].tv_nsec,
  103. cycleDuration[i].tv_sec,
  104. cycleDuration[i].tv_nsec);
  105. }
  106. fclose(fpPublisher);
  107. }
  108. }
  109. /* The following three methods are originally defined in
  110. * /src/pubsub/ua_pubsub_manager.c. We provide a custom implementation here to
  111. * use system interrupts instead if time-triggered callbacks in the OPC UA
  112. * server control flow. */
  113. UA_StatusCode
  114. UA_PubSubManager_addRepeatedCallback(UA_Server *server,
  115. UA_ServerCallback callback,
  116. void *data, UA_Double interval_ms,
  117. UA_UInt64 *callbackId) {
  118. if(pubCallback) {
  119. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  120. "At most one publisher can be registered for interrupt callbacks");
  121. return UA_STATUSCODE_BADINTERNALERROR;
  122. }
  123. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  124. "Adding a publisher with a cycle time of %lf milliseconds", interval_ms);
  125. /* Set global values for the publish callback */
  126. int resultTimerCreate = 0;
  127. pubServer = server;
  128. pubCallback = callback;
  129. pubData = data;
  130. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  131. /* Handle the signal */
  132. memset(&signalAction, 0, sizeof(signalAction));
  133. signalAction.sa_flags = SA_SIGINFO;
  134. signalAction.sa_sigaction = publishInterrupt;
  135. sigemptyset(&signalAction.sa_mask);
  136. sigaction(SIG, &signalAction, NULL);
  137. /* Create the timer */
  138. memset(&pubEventTimer, 0, sizeof(pubEventTimer));
  139. pubEvent.sigev_notify = SIGEV_SIGNAL;
  140. pubEvent.sigev_signo = SIG;
  141. pubEvent.sigev_value.sival_ptr = &pubEventTimer;
  142. resultTimerCreate = timer_create(CLOCKID, &pubEvent, &pubEventTimer);
  143. if(resultTimerCreate != 0) {
  144. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  145. "Failed to create a system event");
  146. return UA_STATUSCODE_BADINTERNALERROR;
  147. }
  148. /* Arm the timer */
  149. struct itimerspec timerspec;
  150. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  151. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  152. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  153. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  154. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  155. if(resultTimerCreate != 0) {
  156. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  157. "Failed to arm the system timer");
  158. timer_delete(pubEventTimer);
  159. return UA_STATUSCODE_BADINTERNALERROR;
  160. }
  161. /* Start taking measurements */
  162. publisherMeasurementsCounter = 0;
  163. clock_gettime(CLOCKID, &calculatedCycleStartTime[0]);
  164. calculatedCycleStartTime[0].tv_nsec += pubIntervalNs;
  165. nanoSecondFieldConversion(&calculatedCycleStartTime[0]);
  166. return UA_STATUSCODE_GOOD;
  167. }
  168. UA_StatusCode
  169. UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server,
  170. UA_UInt64 callbackId,
  171. UA_Double interval_ms) {
  172. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  173. "Switching the publisher cycle to %lf milliseconds", interval_ms);
  174. struct itimerspec timerspec;
  175. int resultTimerCreate = 0;
  176. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  177. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  178. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  179. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  180. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  181. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  182. if(resultTimerCreate != 0) {
  183. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  184. "Failed to arm the system timer");
  185. timer_delete(pubEventTimer);
  186. return UA_STATUSCODE_BADINTERNALERROR;
  187. }
  188. clock_gettime(CLOCKID, &calculatedCycleStartTime[publisherMeasurementsCounter]);
  189. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec += pubIntervalNs;
  190. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  191. return UA_STATUSCODE_GOOD;
  192. }
  193. void
  194. UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) {
  195. timer_delete(pubEventTimer);
  196. pubCallback = NULL; /* So that a new callback can be registered */
  197. }
  198. static void
  199. addPubSubConfiguration(UA_Server* server) {
  200. UA_NodeId connectionIdent;
  201. UA_NodeId publishedDataSetIdent;
  202. UA_NodeId writerGroupIdent;
  203. UA_PubSubConnectionConfig connectionConfig;
  204. memset(&connectionConfig, 0, sizeof(connectionConfig));
  205. connectionConfig.name = UA_STRING("UDP-UADP Connection 1");
  206. connectionConfig.transportProfileUri =
  207. UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
  208. connectionConfig.enabled = true;
  209. UA_NetworkAddressUrlDataType networkAddressUrl =
  210. {UA_STRING(ETH_INTERFACE), UA_STRING(ETH_PUBLISH_ADDRESS)};
  211. UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
  212. &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
  213. connectionConfig.publisherId.numeric = UA_UInt32_random();
  214. UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
  215. UA_PublishedDataSetConfig publishedDataSetConfig;
  216. memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
  217. publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
  218. publishedDataSetConfig.name = UA_STRING("Demo PDS");
  219. UA_Server_addPublishedDataSet(server, &publishedDataSetConfig,
  220. &publishedDataSetIdent);
  221. UA_NodeId dataSetFieldIdentCounter;
  222. UA_DataSetFieldConfig counterValue;
  223. memset(&counterValue, 0, sizeof(UA_DataSetFieldConfig));
  224. counterValue.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
  225. counterValue.field.variable.fieldNameAlias = UA_STRING ("Counter Variable 1");
  226. counterValue.field.variable.promotedField = UA_FALSE;
  227. counterValue.field.variable.publishParameters.publishedVariable = counterNodePublisher;
  228. counterValue.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
  229. UA_Server_addDataSetField(server, publishedDataSetIdent, &counterValue,
  230. &dataSetFieldIdentCounter);
  231. UA_WriterGroupConfig writerGroupConfig;
  232. memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
  233. writerGroupConfig.name = UA_STRING("Demo WriterGroup");
  234. writerGroupConfig.publishingInterval = PUB_INTERVAL;
  235. writerGroupConfig.enabled = UA_FALSE;
  236. writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
  237. UA_Server_addWriterGroup(server, connectionIdent,
  238. &writerGroupConfig, &writerGroupIdent);
  239. UA_NodeId dataSetWriterIdent;
  240. UA_DataSetWriterConfig dataSetWriterConfig;
  241. memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
  242. dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
  243. dataSetWriterConfig.dataSetWriterId = DATA_SET_WRITER_ID;
  244. dataSetWriterConfig.keyFrameCount = 10;
  245. UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
  246. &dataSetWriterConfig, &dataSetWriterIdent);
  247. }
  248. static void
  249. addServerNodes(UA_Server* server) {
  250. UA_UInt64 publishValue = 0;
  251. UA_VariableAttributes publisherAttr = UA_VariableAttributes_default;
  252. UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
  253. publisherAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher Counter");
  254. publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
  255. UA_Server_addVariableNode(server, counterNodePublisher,
  256. UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
  257. UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
  258. UA_QUALIFIEDNAME(1, "Publisher Counter"),
  259. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  260. publisherAttr, NULL, NULL);
  261. }
  262. /* Stop signal */
  263. static void stopHandler(int sign) {
  264. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
  265. running = UA_FALSE;
  266. }
  267. int main(void) {
  268. signal(SIGINT, stopHandler);
  269. signal(SIGTERM, stopHandler);
  270. UA_Server *server = UA_Server_new();
  271. UA_ServerConfig *config = UA_Server_getConfig(server);
  272. UA_ServerConfig_setDefault(config);
  273. config->pubsubTransportLayers = (UA_PubSubTransportLayer *)
  274. UA_malloc(sizeof(UA_PubSubTransportLayer));
  275. config->pubsubTransportLayers[0] = UA_PubSubTransportLayerEthernet();
  276. config->pubsubTransportLayersSize++;
  277. addServerNodes(server);
  278. addPubSubConfiguration(server);
  279. /* Run the server */
  280. UA_StatusCode retval = UA_Server_run(server, &running);
  281. UA_Server_delete(server);
  282. UA_ServerConfig_delete(config);
  283. return (int)retval;
  284. }