pubsub_interrupt_publish.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  2. * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  3. *
  4. * Copyright 2018-2019 (c) Kalycito Infotech
  5. * Copyright 2019 (c) Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright 2019 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include <signal.h>
  9. #include <stdio.h>
  10. #include <time.h>
  11. #include <open62541/server.h>
  12. #include <open62541/server_config_default.h>
  13. #include <open62541/plugin/log_stdout.h>
  14. #include <open62541/plugin/pubsub_ethernet.h>
  15. #include "bufmalloc.h"
  16. #define ETH_PUBLISH_ADDRESS "opc.eth://0a-00-27-00-00-08"
  17. #define ETH_INTERFACE "enp0s8"
  18. #define MAX_MEASUREMENTS 10000
  19. #define MILLI_AS_NANO_SECONDS (1000 * 1000)
  20. #define SECONDS_AS_NANO_SECONDS (1000 * 1000 * 1000)
  21. #define CLOCKID CLOCK_MONOTONIC_RAW
  22. #define SIG SIGUSR1
  23. #define PUB_INTERVAL 0.25 /* Publish interval in milliseconds */
  24. #define DATA_SET_WRITER_ID 62541
  25. #define MEASUREMENT_OUTPUT "publisher_measurement.csv"
  26. UA_NodeId counterNodePublisher = {1, UA_NODEIDTYPE_NUMERIC, {1234}};
  27. UA_Int64 pubIntervalNs;
  28. UA_ServerCallback pubCallback = NULL; /* Sentinel if a timer is active */
  29. UA_Server *pubServer;
  30. UA_Boolean running = true;
  31. void *pubData;
  32. timer_t pubEventTimer;
  33. struct sigevent pubEvent;
  34. struct sigaction signalAction;
  35. /* Arrays to store measurement data */
  36. UA_Int32 currentPublishCycleTime[MAX_MEASUREMENTS+1];
  37. struct timespec calculatedCycleStartTime[MAX_MEASUREMENTS+1];
  38. struct timespec cycleStartDelay[MAX_MEASUREMENTS+1];
  39. struct timespec cycleDuration[MAX_MEASUREMENTS+1];
  40. size_t publisherMeasurementsCounter = 0;
  41. /* The value to published */
  42. #define UA_ENABLE_STATICVALUESOURCE
  43. static UA_UInt64 publishValue = 62541;
  44. static UA_StatusCode
  45. readPublishValue(UA_Server *server,
  46. const UA_NodeId *sessionId, void *sessionContext,
  47. const UA_NodeId *nodeId, void *nodeContext,
  48. UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
  49. UA_DataValue *dataValue) {
  50. UA_Variant_setScalarCopy(&dataValue->value, &publishValue,
  51. &UA_TYPES[UA_TYPES_UINT64]);
  52. dataValue->hasValue = true;
  53. return UA_STATUSCODE_GOOD;
  54. }
  55. static void
  56. timespec_diff(struct timespec *start, struct timespec *stop,
  57. struct timespec *result) {
  58. if((stop->tv_nsec - start->tv_nsec) < 0) {
  59. result->tv_sec = stop->tv_sec - start->tv_sec - 1;
  60. result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000;
  61. } else {
  62. result->tv_sec = stop->tv_sec - start->tv_sec;
  63. result->tv_nsec = stop->tv_nsec - start->tv_nsec;
  64. }
  65. }
  66. /* Used to adjust the nanosecond > 1s field value */
  67. static void
  68. nanoSecondFieldConversion(struct timespec *timeSpecValue) {
  69. while(timeSpecValue->tv_nsec > (SECONDS_AS_NANO_SECONDS - 1)) {
  70. timeSpecValue->tv_sec += 1;
  71. timeSpecValue->tv_nsec -= SECONDS_AS_NANO_SECONDS;
  72. }
  73. }
  74. /* Signal handler */
  75. static void
  76. publishInterrupt(int sig, siginfo_t* si, void* uc) {
  77. if(si->si_value.sival_ptr != &pubEventTimer) {
  78. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "stray signal");
  79. return;
  80. }
  81. /* Execute the publish callback in the interrupt */
  82. struct timespec begin, end;
  83. clock_gettime(CLOCKID, &begin);
  84. useMembufAlloc();
  85. pubCallback(pubServer, pubData);
  86. useNormalAlloc();
  87. clock_gettime(CLOCKID, &end);
  88. if(publisherMeasurementsCounter >= MAX_MEASUREMENTS)
  89. return;
  90. /* Save current configured publish interval */
  91. currentPublishCycleTime[publisherMeasurementsCounter] = pubIntervalNs;
  92. /* Save the difference to the calculated time */
  93. timespec_diff(&calculatedCycleStartTime[publisherMeasurementsCounter],
  94. &begin, &cycleStartDelay[publisherMeasurementsCounter]);
  95. /* Save the duration of the publish callback */
  96. timespec_diff(&begin, &end, &cycleDuration[publisherMeasurementsCounter]);
  97. publisherMeasurementsCounter++;
  98. /* Save the calculated starting time for the next cycle */
  99. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec =
  100. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_nsec + pubIntervalNs;
  101. calculatedCycleStartTime[publisherMeasurementsCounter].tv_sec =
  102. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_sec;
  103. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  104. /* Write the pubsub measurement data */
  105. if(publisherMeasurementsCounter == MAX_MEASUREMENTS) {
  106. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  107. "Logging the measurements to %s", MEASUREMENT_OUTPUT);
  108. FILE *fpPublisher = fopen(MEASUREMENT_OUTPUT, "w");
  109. for(UA_UInt32 i = 0; i < publisherMeasurementsCounter; i++) {
  110. fprintf(fpPublisher, "%u, %u, %ld.%09ld, %ld.%09ld, %ld.%09ld\n",
  111. i,
  112. currentPublishCycleTime[i],
  113. calculatedCycleStartTime[i].tv_sec,
  114. calculatedCycleStartTime[i].tv_nsec,
  115. cycleStartDelay[i].tv_sec,
  116. cycleStartDelay[i].tv_nsec,
  117. cycleDuration[i].tv_sec,
  118. cycleDuration[i].tv_nsec);
  119. }
  120. fclose(fpPublisher);
  121. }
  122. }
  123. /* The following three methods are originally defined in
  124. * /src/pubsub/ua_pubsub_manager.c. We provide a custom implementation here to
  125. * use system interrupts instead if time-triggered callbacks in the OPC UA
  126. * server control flow. */
  127. UA_StatusCode
  128. UA_PubSubManager_addRepeatedCallback(UA_Server *server,
  129. UA_ServerCallback callback,
  130. void *data, UA_Double interval_ms,
  131. UA_UInt64 *callbackId) {
  132. if(pubCallback) {
  133. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  134. "At most one publisher can be registered for interrupt callbacks");
  135. return UA_STATUSCODE_BADINTERNALERROR;
  136. }
  137. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  138. "Adding a publisher with a cycle time of %lf milliseconds", interval_ms);
  139. /* Set global values for the publish callback */
  140. int resultTimerCreate = 0;
  141. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  142. /* Handle the signal */
  143. memset(&signalAction, 0, sizeof(signalAction));
  144. signalAction.sa_flags = SA_SIGINFO;
  145. signalAction.sa_sigaction = publishInterrupt;
  146. sigemptyset(&signalAction.sa_mask);
  147. sigaction(SIG, &signalAction, NULL);
  148. /* Create the timer */
  149. memset(&pubEventTimer, 0, sizeof(pubEventTimer));
  150. pubEvent.sigev_notify = SIGEV_SIGNAL;
  151. pubEvent.sigev_signo = SIG;
  152. pubEvent.sigev_value.sival_ptr = &pubEventTimer;
  153. resultTimerCreate = timer_create(CLOCKID, &pubEvent, &pubEventTimer);
  154. if(resultTimerCreate != 0) {
  155. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  156. "Failed to create a system event with code %i", resultTimerCreate);
  157. return UA_STATUSCODE_BADINTERNALERROR;
  158. }
  159. /* Arm the timer */
  160. struct itimerspec timerspec;
  161. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  162. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  163. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  164. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  165. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  166. if(resultTimerCreate != 0) {
  167. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  168. "Failed to arm the system timer with code %i", resultTimerCreate);
  169. timer_delete(pubEventTimer);
  170. return UA_STATUSCODE_BADINTERNALERROR;
  171. }
  172. /* Start taking measurements */
  173. publisherMeasurementsCounter = 0;
  174. clock_gettime(CLOCKID, &calculatedCycleStartTime[0]);
  175. calculatedCycleStartTime[0].tv_nsec += pubIntervalNs;
  176. nanoSecondFieldConversion(&calculatedCycleStartTime[0]);
  177. /* Set the callback -- used as a sentinel to detect an operational publisher */
  178. pubServer = server;
  179. pubCallback = callback;
  180. pubData = data;
  181. return UA_STATUSCODE_GOOD;
  182. }
  183. UA_StatusCode
  184. UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server,
  185. UA_UInt64 callbackId,
  186. UA_Double interval_ms) {
  187. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  188. "Switching the publisher cycle to %lf milliseconds", interval_ms);
  189. struct itimerspec timerspec;
  190. int resultTimerCreate = 0;
  191. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  192. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  193. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  194. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  195. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  196. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  197. if(resultTimerCreate != 0) {
  198. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  199. "Failed to arm the system timer");
  200. timer_delete(pubEventTimer);
  201. return UA_STATUSCODE_BADINTERNALERROR;
  202. }
  203. clock_gettime(CLOCKID, &calculatedCycleStartTime[publisherMeasurementsCounter]);
  204. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec += pubIntervalNs;
  205. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  206. return UA_STATUSCODE_GOOD;
  207. }
  208. void
  209. UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) {
  210. if(!pubCallback)
  211. return;
  212. timer_delete(pubEventTimer);
  213. pubCallback = NULL; /* So that a new callback can be registered */
  214. }
  215. static void
  216. addPubSubConfiguration(UA_Server* server) {
  217. UA_NodeId connectionIdent;
  218. UA_NodeId publishedDataSetIdent;
  219. UA_NodeId writerGroupIdent;
  220. UA_PubSubConnectionConfig connectionConfig;
  221. memset(&connectionConfig, 0, sizeof(connectionConfig));
  222. connectionConfig.name = UA_STRING("UDP-UADP Connection 1");
  223. connectionConfig.transportProfileUri =
  224. UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
  225. connectionConfig.enabled = true;
  226. UA_NetworkAddressUrlDataType networkAddressUrl =
  227. {UA_STRING(ETH_INTERFACE), UA_STRING(ETH_PUBLISH_ADDRESS)};
  228. UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
  229. &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
  230. connectionConfig.publisherId.numeric = UA_UInt32_random();
  231. UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
  232. UA_PublishedDataSetConfig publishedDataSetConfig;
  233. memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
  234. publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
  235. publishedDataSetConfig.name = UA_STRING("Demo PDS");
  236. UA_Server_addPublishedDataSet(server, &publishedDataSetConfig,
  237. &publishedDataSetIdent);
  238. UA_NodeId dataSetFieldIdentCounter;
  239. UA_DataSetFieldConfig counterValue;
  240. memset(&counterValue, 0, sizeof(UA_DataSetFieldConfig));
  241. counterValue.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
  242. counterValue.field.variable.fieldNameAlias = UA_STRING ("Counter Variable 1");
  243. counterValue.field.variable.promotedField = UA_FALSE;
  244. counterValue.field.variable.publishParameters.publishedVariable = counterNodePublisher;
  245. counterValue.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
  246. #ifdef UA_ENABLE_STATICVALUESOURCE
  247. counterValue.field.variable.staticValueSourceEnabled = true;
  248. UA_DataValue_init(&counterValue.field.variable.staticValueSource);
  249. UA_Variant_setScalar(&counterValue.field.variable.staticValueSource.value,
  250. &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
  251. counterValue.field.variable.staticValueSource.value.storageType = UA_VARIANT_DATA_NODELETE;
  252. #endif
  253. UA_Server_addDataSetField(server, publishedDataSetIdent, &counterValue,
  254. &dataSetFieldIdentCounter);
  255. UA_WriterGroupConfig writerGroupConfig;
  256. memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
  257. writerGroupConfig.name = UA_STRING("Demo WriterGroup");
  258. writerGroupConfig.publishingInterval = PUB_INTERVAL;
  259. writerGroupConfig.enabled = UA_FALSE;
  260. writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
  261. UA_Server_addWriterGroup(server, connectionIdent,
  262. &writerGroupConfig, &writerGroupIdent);
  263. UA_NodeId dataSetWriterIdent;
  264. UA_DataSetWriterConfig dataSetWriterConfig;
  265. memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
  266. dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
  267. dataSetWriterConfig.dataSetWriterId = DATA_SET_WRITER_ID;
  268. dataSetWriterConfig.keyFrameCount = 10;
  269. UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
  270. &dataSetWriterConfig, &dataSetWriterIdent);
  271. UA_Server_freezeWriterGroupConfiguration(server, writerGroupIdent);
  272. UA_Server_setWriterGroupOperational(server, writerGroupIdent);
  273. }
  274. static void
  275. addServerNodes(UA_Server* server) {
  276. UA_UInt64 publishValue = 0;
  277. UA_VariableAttributes publisherAttr = UA_VariableAttributes_default;
  278. UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
  279. publisherAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher Counter");
  280. publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
  281. UA_DataSource dataSource;
  282. dataSource.read = readPublishValue;
  283. dataSource.write = NULL;
  284. UA_Server_addDataSourceVariableNode(server, counterNodePublisher,
  285. UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
  286. UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
  287. UA_QUALIFIEDNAME(1, "Publisher Counter"),
  288. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  289. publisherAttr, dataSource, NULL, NULL);
  290. }
  291. /* Stop signal */
  292. static void stopHandler(int sign) {
  293. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
  294. running = UA_FALSE;
  295. }
  296. int main(void) {
  297. signal(SIGINT, stopHandler);
  298. signal(SIGTERM, stopHandler);
  299. UA_Server *server = UA_Server_new();
  300. UA_ServerConfig *config = UA_Server_getConfig(server);
  301. UA_ServerConfig_setDefault(config);
  302. config->pubsubTransportLayers = (UA_PubSubTransportLayer *)
  303. UA_malloc(sizeof(UA_PubSubTransportLayer));
  304. config->pubsubTransportLayers[0] = UA_PubSubTransportLayerEthernet();
  305. config->pubsubTransportLayersSize++;
  306. addServerNodes(server);
  307. addPubSubConfiguration(server);
  308. /* Run the server */
  309. UA_StatusCode retval = UA_Server_run(server, &running);
  310. UA_Server_delete(server);
  311. return (int)retval;
  312. }