pubsub_interrupt_publish.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  2. * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  3. *
  4. * Copyright 2018-2019 (c) Kalycito Infotech
  5. * Copyright 2019 (c) Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright 2019 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include <signal.h>
  9. #include <stdio.h>
  10. #include <time.h>
  11. #include <open62541/server.h>
  12. #include <open62541/server_config_default.h>
  13. #include <open62541/plugin/log_stdout.h>
  14. #include <open62541/plugin/pubsub_ethernet.h>
  15. #include "bufmalloc.h"
  16. #define ETH_PUBLISH_ADDRESS "opc.eth://0a-00-27-00-00-08"
  17. #define ETH_INTERFACE "enp0s8"
  18. #define MAX_MEASUREMENTS 10000
  19. #define MILLI_AS_NANO_SECONDS (1000 * 1000)
  20. #define SECONDS_AS_NANO_SECONDS (1000 * 1000 * 1000)
  21. #define CLOCKID CLOCK_MONOTONIC_RAW
  22. #define SIG SIGUSR1
  23. #define PUB_INTERVAL 0.25 /* Publish interval in milliseconds */
  24. #define DATA_SET_WRITER_ID 62541
  25. #define MEASUREMENT_OUTPUT "publisher_measurement.csv"
  26. UA_NodeId counterNodePublisher = {1, UA_NODEIDTYPE_NUMERIC, {1234}};
  27. UA_Int64 pubIntervalNs;
  28. UA_ServerCallback pubCallback = NULL; /* Sentinel if a timer is active */
  29. UA_Server *pubServer;
  30. UA_Boolean running = true;
  31. void *pubData;
  32. timer_t pubEventTimer;
  33. struct sigevent pubEvent;
  34. struct sigaction signalAction;
  35. /* Arrays to store measurement data */
  36. UA_Int32 currentPublishCycleTime[MAX_MEASUREMENTS+1];
  37. struct timespec calculatedCycleStartTime[MAX_MEASUREMENTS+1];
  38. struct timespec cycleStartDelay[MAX_MEASUREMENTS+1];
  39. struct timespec cycleDuration[MAX_MEASUREMENTS+1];
  40. size_t publisherMeasurementsCounter = 0;
  41. /* The RT level of the publisher */
  42. //#define PUBSUB_RT_LEVEL UA_PUBSUB_RT_NONE
  43. //#define PUBSUB_RT_LEVEL UA_PUBSUB_RT_DIRECT_VALUE_ACCESS
  44. #define PUBSUB_RT_LEVEL UA_PUBSUB_RT_FIXED_SIZE
  45. /* The value to published */
  46. static UA_UInt64 publishValue = 62541;
  47. static UA_StatusCode
  48. readPublishValue(UA_Server *server,
  49. const UA_NodeId *sessionId, void *sessionContext,
  50. const UA_NodeId *nodeId, void *nodeContext,
  51. UA_Boolean sourceTimeStamp, const UA_NumericRange *range,
  52. UA_DataValue *dataValue) {
  53. UA_Variant_setScalarCopy(&dataValue->value, &publishValue,
  54. &UA_TYPES[UA_TYPES_UINT64]);
  55. dataValue->hasValue = true;
  56. return UA_STATUSCODE_GOOD;
  57. }
  58. static void
  59. timespec_diff(struct timespec *start, struct timespec *stop,
  60. struct timespec *result) {
  61. if((stop->tv_nsec - start->tv_nsec) < 0) {
  62. result->tv_sec = stop->tv_sec - start->tv_sec - 1;
  63. result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000;
  64. } else {
  65. result->tv_sec = stop->tv_sec - start->tv_sec;
  66. result->tv_nsec = stop->tv_nsec - start->tv_nsec;
  67. }
  68. }
  69. /* Used to adjust the nanosecond > 1s field value */
  70. static void
  71. nanoSecondFieldConversion(struct timespec *timeSpecValue) {
  72. while(timeSpecValue->tv_nsec > (SECONDS_AS_NANO_SECONDS - 1)) {
  73. timeSpecValue->tv_sec += 1;
  74. timeSpecValue->tv_nsec -= SECONDS_AS_NANO_SECONDS;
  75. }
  76. }
  77. /* Signal handler */
  78. static void
  79. publishInterrupt(int sig, siginfo_t* si, void* uc) {
  80. if(si->si_value.sival_ptr != &pubEventTimer) {
  81. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "stray signal");
  82. return;
  83. }
  84. /* Execute the publish callback in the interrupt */
  85. struct timespec begin, end;
  86. clock_gettime(CLOCKID, &begin);
  87. useMembufAlloc();
  88. pubCallback(pubServer, pubData);
  89. useNormalAlloc();
  90. clock_gettime(CLOCKID, &end);
  91. if(publisherMeasurementsCounter >= MAX_MEASUREMENTS)
  92. return;
  93. /* Save current configured publish interval */
  94. currentPublishCycleTime[publisherMeasurementsCounter] = pubIntervalNs;
  95. /* Save the difference to the calculated time */
  96. timespec_diff(&calculatedCycleStartTime[publisherMeasurementsCounter],
  97. &begin, &cycleStartDelay[publisherMeasurementsCounter]);
  98. /* Save the duration of the publish callback */
  99. timespec_diff(&begin, &end, &cycleDuration[publisherMeasurementsCounter]);
  100. publisherMeasurementsCounter++;
  101. /* Save the calculated starting time for the next cycle */
  102. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec =
  103. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_nsec + pubIntervalNs;
  104. calculatedCycleStartTime[publisherMeasurementsCounter].tv_sec =
  105. calculatedCycleStartTime[publisherMeasurementsCounter - 1].tv_sec;
  106. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  107. /* Write the pubsub measurement data */
  108. if(publisherMeasurementsCounter == MAX_MEASUREMENTS) {
  109. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  110. "Logging the measurements to %s", MEASUREMENT_OUTPUT);
  111. FILE *fpPublisher = fopen(MEASUREMENT_OUTPUT, "w");
  112. for(UA_UInt32 i = 0; i < publisherMeasurementsCounter; i++) {
  113. fprintf(fpPublisher, "%u, %u, %ld.%09ld, %ld.%09ld, %ld.%09ld\n",
  114. i,
  115. currentPublishCycleTime[i],
  116. calculatedCycleStartTime[i].tv_sec,
  117. calculatedCycleStartTime[i].tv_nsec,
  118. cycleStartDelay[i].tv_sec,
  119. cycleStartDelay[i].tv_nsec,
  120. cycleDuration[i].tv_sec,
  121. cycleDuration[i].tv_nsec);
  122. }
  123. fclose(fpPublisher);
  124. }
  125. }
  126. /* The following three methods are originally defined in
  127. * /src/pubsub/ua_pubsub_manager.c. We provide a custom implementation here to
  128. * use system interrupts instead if time-triggered callbacks in the OPC UA
  129. * server control flow. */
  130. UA_StatusCode
  131. UA_PubSubManager_addRepeatedCallback(UA_Server *server,
  132. UA_ServerCallback callback,
  133. void *data, UA_Double interval_ms,
  134. UA_UInt64 *callbackId) {
  135. if(pubCallback) {
  136. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  137. "At most one publisher can be registered for interrupt callbacks");
  138. return UA_STATUSCODE_BADINTERNALERROR;
  139. }
  140. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  141. "Adding a publisher with a cycle time of %lf milliseconds", interval_ms);
  142. /* Set global values for the publish callback */
  143. int resultTimerCreate = 0;
  144. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  145. /* Handle the signal */
  146. memset(&signalAction, 0, sizeof(signalAction));
  147. signalAction.sa_flags = SA_SIGINFO;
  148. signalAction.sa_sigaction = publishInterrupt;
  149. sigemptyset(&signalAction.sa_mask);
  150. sigaction(SIG, &signalAction, NULL);
  151. /* Create the timer */
  152. memset(&pubEventTimer, 0, sizeof(pubEventTimer));
  153. pubEvent.sigev_notify = SIGEV_SIGNAL;
  154. pubEvent.sigev_signo = SIG;
  155. pubEvent.sigev_value.sival_ptr = &pubEventTimer;
  156. resultTimerCreate = timer_create(CLOCKID, &pubEvent, &pubEventTimer);
  157. if(resultTimerCreate != 0) {
  158. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  159. "Failed to create a system event with code %s",
  160. strerror(errno));
  161. return UA_STATUSCODE_BADINTERNALERROR;
  162. }
  163. /* Arm the timer */
  164. struct itimerspec timerspec;
  165. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  166. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  167. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  168. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  169. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  170. if(resultTimerCreate != 0) {
  171. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  172. "Failed to arm the system timer with code %i", resultTimerCreate);
  173. timer_delete(pubEventTimer);
  174. return UA_STATUSCODE_BADINTERNALERROR;
  175. }
  176. /* Start taking measurements */
  177. publisherMeasurementsCounter = 0;
  178. clock_gettime(CLOCKID, &calculatedCycleStartTime[0]);
  179. calculatedCycleStartTime[0].tv_nsec += pubIntervalNs;
  180. nanoSecondFieldConversion(&calculatedCycleStartTime[0]);
  181. /* Set the callback -- used as a sentinel to detect an operational publisher */
  182. pubServer = server;
  183. pubCallback = callback;
  184. pubData = data;
  185. return UA_STATUSCODE_GOOD;
  186. }
  187. UA_StatusCode
  188. UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server,
  189. UA_UInt64 callbackId,
  190. UA_Double interval_ms) {
  191. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  192. "Switching the publisher cycle to %lf milliseconds", interval_ms);
  193. struct itimerspec timerspec;
  194. int resultTimerCreate = 0;
  195. pubIntervalNs = (UA_Int64) (interval_ms * MILLI_AS_NANO_SECONDS);
  196. timerspec.it_interval.tv_sec = (long int) (pubIntervalNs / SECONDS_AS_NANO_SECONDS);
  197. timerspec.it_interval.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  198. timerspec.it_value.tv_sec = (long int) (pubIntervalNs / (SECONDS_AS_NANO_SECONDS));
  199. timerspec.it_value.tv_nsec = (long int) (pubIntervalNs % SECONDS_AS_NANO_SECONDS);
  200. resultTimerCreate = timer_settime(pubEventTimer, 0, &timerspec, NULL);
  201. if(resultTimerCreate != 0) {
  202. UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
  203. "Failed to arm the system timer");
  204. timer_delete(pubEventTimer);
  205. return UA_STATUSCODE_BADINTERNALERROR;
  206. }
  207. clock_gettime(CLOCKID, &calculatedCycleStartTime[publisherMeasurementsCounter]);
  208. calculatedCycleStartTime[publisherMeasurementsCounter].tv_nsec += pubIntervalNs;
  209. nanoSecondFieldConversion(&calculatedCycleStartTime[publisherMeasurementsCounter]);
  210. return UA_STATUSCODE_GOOD;
  211. }
  212. void
  213. UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) {
  214. if(!pubCallback)
  215. return;
  216. timer_delete(pubEventTimer);
  217. pubCallback = NULL; /* So that a new callback can be registered */
  218. }
  219. static void
  220. addPubSubConfiguration(UA_Server* server) {
  221. UA_NodeId connectionIdent;
  222. UA_NodeId publishedDataSetIdent;
  223. UA_NodeId writerGroupIdent;
  224. UA_PubSubConnectionConfig connectionConfig;
  225. memset(&connectionConfig, 0, sizeof(connectionConfig));
  226. connectionConfig.name = UA_STRING("UDP-UADP Connection 1");
  227. connectionConfig.transportProfileUri =
  228. UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
  229. connectionConfig.enabled = true;
  230. UA_NetworkAddressUrlDataType networkAddressUrl =
  231. {UA_STRING(ETH_INTERFACE), UA_STRING(ETH_PUBLISH_ADDRESS)};
  232. UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
  233. &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
  234. connectionConfig.publisherId.numeric = UA_UInt32_random();
  235. UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
  236. UA_PublishedDataSetConfig publishedDataSetConfig;
  237. memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
  238. publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
  239. publishedDataSetConfig.name = UA_STRING("Demo PDS");
  240. UA_Server_addPublishedDataSet(server, &publishedDataSetConfig,
  241. &publishedDataSetIdent);
  242. UA_NodeId dataSetFieldIdentCounter;
  243. UA_DataSetFieldConfig counterValue;
  244. memset(&counterValue, 0, sizeof(UA_DataSetFieldConfig));
  245. counterValue.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
  246. counterValue.field.variable.fieldNameAlias = UA_STRING ("Counter Variable 1");
  247. counterValue.field.variable.promotedField = UA_FALSE;
  248. counterValue.field.variable.publishParameters.publishedVariable = counterNodePublisher;
  249. counterValue.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
  250. #if (PUBSUB_RT_LEVEL == UA_PUBSUB_RT_FIXED_SIZE) || (PUBSUB_RT_LEVEL == UA_PUBSUB_RT_DIRECT_VALUE_ACCESS)
  251. counterValue.field.variable.staticValueSourceEnabled = true;
  252. UA_DataValue_init(&counterValue.field.variable.staticValueSource);
  253. UA_Variant_setScalar(&counterValue.field.variable.staticValueSource.value,
  254. &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
  255. counterValue.field.variable.staticValueSource.value.storageType = UA_VARIANT_DATA_NODELETE;
  256. #endif
  257. UA_Server_addDataSetField(server, publishedDataSetIdent, &counterValue,
  258. &dataSetFieldIdentCounter);
  259. UA_WriterGroupConfig writerGroupConfig;
  260. memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
  261. writerGroupConfig.name = UA_STRING("Demo WriterGroup");
  262. writerGroupConfig.publishingInterval = PUB_INTERVAL;
  263. writerGroupConfig.enabled = UA_FALSE;
  264. writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
  265. writerGroupConfig.rtLevel = PUBSUB_RT_LEVEL;
  266. UA_Server_addWriterGroup(server, connectionIdent,
  267. &writerGroupConfig, &writerGroupIdent);
  268. UA_NodeId dataSetWriterIdent;
  269. UA_DataSetWriterConfig dataSetWriterConfig;
  270. memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
  271. dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
  272. dataSetWriterConfig.dataSetWriterId = DATA_SET_WRITER_ID;
  273. dataSetWriterConfig.keyFrameCount = 10;
  274. UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
  275. &dataSetWriterConfig, &dataSetWriterIdent);
  276. UA_Server_freezeWriterGroupConfiguration(server, writerGroupIdent);
  277. UA_Server_setWriterGroupOperational(server, writerGroupIdent);
  278. }
  279. static void
  280. addServerNodes(UA_Server* server) {
  281. UA_UInt64 publishValue = 0;
  282. UA_VariableAttributes publisherAttr = UA_VariableAttributes_default;
  283. UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]);
  284. publisherAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Publisher Counter");
  285. publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
  286. UA_DataSource dataSource;
  287. dataSource.read = readPublishValue;
  288. dataSource.write = NULL;
  289. UA_Server_addDataSourceVariableNode(server, counterNodePublisher,
  290. UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
  291. UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
  292. UA_QUALIFIEDNAME(1, "Publisher Counter"),
  293. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  294. publisherAttr, dataSource, NULL, NULL);
  295. }
  296. /* Stop signal */
  297. static void stopHandler(int sign) {
  298. UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
  299. running = UA_FALSE;
  300. }
  301. int main(void) {
  302. signal(SIGINT, stopHandler);
  303. signal(SIGTERM, stopHandler);
  304. UA_Server *server = UA_Server_new();
  305. UA_ServerConfig *config = UA_Server_getConfig(server);
  306. UA_ServerConfig_setDefault(config);
  307. config->pubsubTransportLayers = (UA_PubSubTransportLayer *)
  308. UA_malloc(sizeof(UA_PubSubTransportLayer));
  309. config->pubsubTransportLayers[0] = UA_PubSubTransportLayerEthernet();
  310. config->pubsubTransportLayersSize++;
  311. addServerNodes(server);
  312. addPubSubConfiguration(server);
  313. /* Run the server */
  314. UA_StatusCode retval = UA_Server_run(server, &running);
  315. UA_Server_delete(server);
  316. return (int)retval;
  317. }