mqtt.c 57 KB


  1. /*
  2. MIT License
  3. Copyright (c) 2018 Liam Bindle
  4. Permission is hereby granted, free of charge, to any person obtaining a copy
  5. of this software and associated documentation files (the "Software"), to deal
  6. in the Software without restriction, including without limitation the rights
  7. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. copies of the Software, and to permit persons to whom the Software is
  9. furnished to do so, subject to the following conditions:
  10. The above copyright notice and this permission notice shall be included in all
  11. copies or substantial portions of the Software.
  12. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  18. SOFTWARE.
  19. */
  20. /**
  21. * @file
  22. * @brief Implements the functionality of MQTT-C.
  23. * @note The only files that are included are mqtt.h and mqtt_pal.h.
  24. *
  25. * @cond Doxygen_Suppress
  26. */
  27. #include "mqtt.h"
  28. enum MQTTErrors mqtt_sync(struct mqtt_client *client) {
  29. /* Recover from any errors */
  30. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  31. if (client->error != MQTT_OK && client->reconnect_callback != NULL) {
  32. client->reconnect_callback(client, &client->reconnect_state);
  33. /* unlocked during CONNECT */
  34. } else {
  35. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  36. }
  37. /* Call inspector callback if necessary */
  38. enum MQTTErrors err;
  39. if (client->inspector_callback != NULL) {
  40. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  41. err = client->inspector_callback(client);
  42. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  43. if (err != MQTT_OK) return err;
  44. }
  45. /* Call receive */
  46. err = (enum MQTTErrors)__mqtt_recv(client);
  47. if (err != MQTT_OK) return err;
  48. /* Call send */
  49. err = (enum MQTTErrors)__mqtt_send(client);
  50. return err;
  51. }
  52. uint16_t __mqtt_next_pid(struct mqtt_client *client) {
  53. if (client->pid_lfsr == 0) {
  54. client->pid_lfsr = 163u;
  55. }
  56. /* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */
  57. int pid_exists = 0;
  58. do {
  59. unsigned lsb = client->pid_lfsr & 1;
  60. (client->pid_lfsr) >>= 1;
  61. if (lsb) {
  62. client->pid_lfsr ^= 0xB400u;
  63. }
  64. /* check that the PID is unique */
  65. pid_exists = 0;
  66. struct mqtt_queued_message *curr;
  67. for(curr = mqtt_mq_get(&(client->mq), 0); curr >= client->mq.queue_tail; --curr) {
  68. if (curr->packet_id == client->pid_lfsr) {
  69. pid_exists = 1;
  70. break;
  71. }
  72. }
  73. } while(pid_exists);
  74. return client->pid_lfsr;
  75. }
  76. enum MQTTErrors mqtt_init(struct mqtt_client *client,
  77. mqtt_pal_socket_handle sockfd,
  78. uint8_t *sendbuf, size_t sendbufsz,
  79. uint8_t *recvbuf, size_t recvbufsz,
  80. void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish))
  81. {
  82. if (client == NULL || sendbuf == NULL || recvbuf == NULL) {
  83. return MQTT_ERROR_NULLPTR;
  84. }
  85. /* initialize mutex */
  86. MQTT_PAL_MUTEX_INIT(&client->mutex);
  87. MQTT_PAL_MUTEX_LOCK(&client->mutex); /* unlocked during CONNECT */
  88. client->socketfd = sockfd;
  89. mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
  90. client->recv_buffer.mem_start = recvbuf;
  91. client->recv_buffer.mem_size = recvbufsz;
  92. client->recv_buffer.curr = client->recv_buffer.mem_start;
  93. client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
  94. client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
  95. client->response_timeout = 30;
  96. client->number_of_timeouts = 0;
  97. client->number_of_keep_alives = 0;
  98. client->typical_response_time = -1.0;
  99. client->publish_response_callback = publish_response_callback;
  100. client->inspector_callback = NULL;
  101. client->reconnect_callback = NULL;
  102. client->reconnect_state = NULL;
  103. return MQTT_OK;
  104. }
  105. void mqtt_init_reconnect(struct mqtt_client *client,
  106. void (*reconnect)(struct mqtt_client *, void**),
  107. void *reconnect_state,
  108. void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish))
  109. {
  110. /* initialize mutex */
  111. MQTT_PAL_MUTEX_INIT(&client->mutex);
  112. client->socketfd = (mqtt_pal_socket_handle) -1;
  113. mqtt_mq_init(&client->mq, NULL, 0);
  114. client->recv_buffer.mem_start = NULL;
  115. client->recv_buffer.mem_size = 0;
  116. client->recv_buffer.curr = NULL;
  117. client->recv_buffer.curr_sz = 0;
  118. client->error = MQTT_ERROR_INITIAL_RECONNECT;
  119. client->response_timeout = 30;
  120. client->number_of_timeouts = 0;
  121. client->number_of_keep_alives = 0;
  122. client->typical_response_time = -1.0;
  123. client->publish_response_callback = publish_response_callback;
  124. client->inspector_callback = NULL;
  125. client->reconnect_callback = reconnect;
  126. client->reconnect_state = reconnect_state;
  127. }
  128. void mqtt_reinit(struct mqtt_client* client,
  129. mqtt_pal_socket_handle socketfd,
  130. uint8_t *sendbuf, size_t sendbufsz,
  131. uint8_t *recvbuf, size_t recvbufsz)
  132. {
  133. client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
  134. client->socketfd = socketfd;
  135. mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
  136. client->recv_buffer.mem_start = recvbuf;
  137. client->recv_buffer.mem_size = recvbufsz;
  138. client->recv_buffer.curr = client->recv_buffer.mem_start;
  139. client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
  140. }
  141. /**
  142. * A macro function that:
  143. * 1) Checks that the client isn't in an error state.
  144. * 2) Attempts to pack to client's message queue.
  145. * a) handles errors
  146. * b) if mq buffer is too small, cleans it and tries again
  147. * 3) Upon successful pack, registers the new message.
  148. */
  149. #define MQTT_CLIENT_TRY_PACK(tmp, msg, client, pack_call, release) \
  150. if (client->error < 0) { \
  151. if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
  152. return client->error; \
  153. } \
  154. tmp = pack_call; \
  155. if (tmp < 0) { \
  156. client->error = (enum MQTTErrors)tmp; \
  157. if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
  158. return (enum MQTTErrors)tmp; \
  159. } else if (tmp == 0) { \
  160. mqtt_mq_clean(&client->mq); \
  161. tmp = pack_call; \
  162. if (tmp < 0) { \
  163. client->error = (enum MQTTErrors)tmp; \
  164. if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
  165. return (enum MQTTErrors)tmp; \
  166. } else if(tmp == 0) { \
  167. client->error = MQTT_ERROR_SEND_BUFFER_IS_FULL; \
  168. if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
  169. return (enum MQTTErrors)MQTT_ERROR_SEND_BUFFER_IS_FULL; \
  170. } \
  171. } \
  172. msg = mqtt_mq_register(&client->mq, (size_t)tmp); \
  173. enum MQTTErrors mqtt_connect(struct mqtt_client *client,
  174. const char* client_id,
  175. const char* will_topic,
  176. const void* will_message,
  177. size_t will_message_size,
  178. const char* user_name,
  179. const char* password,
  180. uint8_t connect_flags,
  181. uint16_t keep_alive)
  182. {
  183. ssize_t rv;
  184. struct mqtt_queued_message *msg;
  185. /* Note: Current thread already has mutex locked. */
  186. /* update the client's state */
  187. client->keep_alive = keep_alive;
  188. if (client->error == MQTT_ERROR_CONNECT_NOT_CALLED) {
  189. client->error = MQTT_OK;
  190. }
  191. /* try to pack the message */
  192. MQTT_CLIENT_TRY_PACK(rv, msg, client,
  193. mqtt_pack_connection_request(
  194. client->mq.curr, client->mq.curr_sz,
  195. client_id, will_topic, will_message,
  196. will_message_size,user_name, password,
  197. connect_flags, keep_alive
  198. ),
  199. 1
  200. );
  201. /* save the control type of the message */
  202. msg->control_type = MQTT_CONTROL_CONNECT;
  203. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  204. return MQTT_OK;
  205. }
  206. enum MQTTErrors mqtt_publish(struct mqtt_client *client,
  207. const char* topic_name,
  208. void* application_message,
  209. size_t application_message_size,
  210. uint8_t publish_flags)
  211. {
  212. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  213. uint16_t packet_id = __mqtt_next_pid(client);
  214. ssize_t rv;
  215. struct mqtt_queued_message *msg;
  216. /* try to pack the message */
  217. MQTT_CLIENT_TRY_PACK(
  218. rv, msg, client,
  219. mqtt_pack_publish_request(
  220. client->mq.curr, client->mq.curr_sz,
  221. topic_name,
  222. packet_id,
  223. application_message,
  224. application_message_size,
  225. publish_flags
  226. ),
  227. 1
  228. );
  229. /* save the control type and packet id of the message */
  230. msg->control_type = MQTT_CONTROL_PUBLISH;
  231. msg->packet_id = packet_id;
  232. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  233. return MQTT_OK;
  234. }
  235. ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id) {
  236. ssize_t rv;
  237. struct mqtt_queued_message *msg;
  238. /* try to pack the message */
  239. MQTT_CLIENT_TRY_PACK(
  240. rv, msg, client,
  241. mqtt_pack_pubxxx_request(
  242. client->mq.curr, client->mq.curr_sz,
  243. MQTT_CONTROL_PUBACK,
  244. packet_id
  245. ),
  246. 0
  247. );
  248. /* save the control type and packet id of the message */
  249. msg->control_type = MQTT_CONTROL_PUBACK;
  250. msg->packet_id = packet_id;
  251. return MQTT_OK;
  252. }
  253. ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id) {
  254. ssize_t rv;
  255. struct mqtt_queued_message *msg;
  256. /* try to pack the message */
  257. MQTT_CLIENT_TRY_PACK(
  258. rv, msg, client,
  259. mqtt_pack_pubxxx_request(
  260. client->mq.curr, client->mq.curr_sz,
  261. MQTT_CONTROL_PUBREC,
  262. packet_id
  263. ),
  264. 0
  265. );
  266. /* save the control type and packet id of the message */
  267. msg->control_type = MQTT_CONTROL_PUBREC;
  268. msg->packet_id = packet_id;
  269. return MQTT_OK;
  270. }
  271. ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id) {
  272. ssize_t rv;
  273. struct mqtt_queued_message *msg;
  274. /* try to pack the message */
  275. MQTT_CLIENT_TRY_PACK(
  276. rv, msg, client,
  277. mqtt_pack_pubxxx_request(
  278. client->mq.curr, client->mq.curr_sz,
  279. MQTT_CONTROL_PUBREL,
  280. packet_id
  281. ),
  282. 0
  283. );
  284. /* save the control type and packet id of the message */
  285. msg->control_type = MQTT_CONTROL_PUBREL;
  286. msg->packet_id = packet_id;
  287. return MQTT_OK;
  288. }
  289. ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id) {
  290. ssize_t rv;
  291. struct mqtt_queued_message *msg;
  292. /* try to pack the message */
  293. MQTT_CLIENT_TRY_PACK(
  294. rv, msg, client,
  295. mqtt_pack_pubxxx_request(
  296. client->mq.curr, client->mq.curr_sz,
  297. MQTT_CONTROL_PUBCOMP,
  298. packet_id
  299. ),
  300. 0
  301. );
  302. /* save the control type and packet id of the message */
  303. msg->control_type = MQTT_CONTROL_PUBCOMP;
  304. msg->packet_id = packet_id;
  305. return MQTT_OK;
  306. }
  307. enum MQTTErrors mqtt_subscribe(struct mqtt_client *client,
  308. const char* topic_name,
  309. int max_qos_level)
  310. {
  311. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  312. uint16_t packet_id = __mqtt_next_pid(client);
  313. ssize_t rv;
  314. struct mqtt_queued_message *msg;
  315. /* try to pack the message */
  316. MQTT_CLIENT_TRY_PACK(
  317. rv, msg, client,
  318. mqtt_pack_subscribe_request(
  319. client->mq.curr, client->mq.curr_sz,
  320. packet_id,
  321. topic_name,
  322. max_qos_level,
  323. NULL
  324. ),
  325. 1
  326. );
  327. /* save the control type and packet id of the message */
  328. msg->control_type = MQTT_CONTROL_SUBSCRIBE;
  329. msg->packet_id = packet_id;
  330. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  331. return MQTT_OK;
  332. }
  333. enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client,
  334. const char* topic_name)
  335. {
  336. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  337. uint16_t packet_id = __mqtt_next_pid(client);
  338. ssize_t rv;
  339. struct mqtt_queued_message *msg;
  340. /* try to pack the message */
  341. MQTT_CLIENT_TRY_PACK(
  342. rv, msg, client,
  343. mqtt_pack_unsubscribe_request(
  344. client->mq.curr, client->mq.curr_sz,
  345. packet_id,
  346. topic_name,
  347. NULL
  348. ),
  349. 1
  350. );
  351. /* save the control type and packet id of the message */
  352. msg->control_type = MQTT_CONTROL_UNSUBSCRIBE;
  353. msg->packet_id = packet_id;
  354. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  355. return MQTT_OK;
  356. }
  357. enum MQTTErrors mqtt_ping(struct mqtt_client *client) {
  358. enum MQTTErrors rv;
  359. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  360. rv = __mqtt_ping(client);
  361. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  362. return rv;
  363. }
  364. enum MQTTErrors __mqtt_ping(struct mqtt_client *client)
  365. {
  366. ssize_t rv;
  367. struct mqtt_queued_message *msg;
  368. /* try to pack the message */
  369. MQTT_CLIENT_TRY_PACK(
  370. rv, msg, client,
  371. mqtt_pack_ping_request(
  372. client->mq.curr, client->mq.curr_sz
  373. ),
  374. 0
  375. );
  376. /* save the control type and packet id of the message */
  377. msg->control_type = MQTT_CONTROL_PINGREQ;
  378. return MQTT_OK;
  379. }
  380. enum MQTTErrors mqtt_disconnect(struct mqtt_client *client)
  381. {
  382. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  383. ssize_t rv;
  384. struct mqtt_queued_message *msg;
  385. /* try to pack the message */
  386. MQTT_CLIENT_TRY_PACK(
  387. rv, msg, client,
  388. mqtt_pack_disconnect(
  389. client->mq.curr, client->mq.curr_sz
  390. ),
  391. 1
  392. );
  393. /* save the control type and packet id of the message */
  394. msg->control_type = MQTT_CONTROL_DISCONNECT;
  395. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  396. return MQTT_OK;
  397. }
  398. ssize_t __mqtt_send(struct mqtt_client *client)
  399. {
  400. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  401. uint8_t inspected;
  402. if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) {
  403. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  404. return client->error;
  405. }
  406. /* loop through all messages in the queue */
  407. int len = (int)(mqtt_mq_length(&client->mq));
  408. int inflight_qos2 = 0;
  409. for(int i = 0; i < len; ++i) {
  410. struct mqtt_queued_message *msg = mqtt_mq_get(&client->mq, i);
  411. int resend = 0;
  412. if (msg->state == MQTT_QUEUED_UNSENT) {
  413. /* message has not been sent to lets send it */
  414. resend = 1;
  415. } else if (msg->state == MQTT_QUEUED_AWAITING_ACK) {
  416. /* check for timeout */
  417. if (MQTT_PAL_TIME() > msg->time_sent + client->response_timeout) {
  418. resend = 1;
  419. client->number_of_timeouts += 1;
  420. }
  421. }
  422. /* only send QoS 2 message if there are no inflight QoS 2 PUBLISH messages */
  423. if (msg->control_type == MQTT_CONTROL_PUBLISH
  424. && (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK))
  425. {
  426. inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
  427. if (inspected == 2) {
  428. if (inflight_qos2) {
  429. resend = 0;
  430. }
  431. inflight_qos2 = 1;
  432. }
  433. }
  434. /* goto next message if we don't need to send */
  435. if (!resend) {
  436. continue;
  437. }
  438. /* we're sending the message */
  439. ssize_t tmp = mqtt_pal_sendall(client->socketfd, msg->start, msg->size, 0);
  440. if (tmp < 0) {
  441. client->error = (enum MQTTErrors)tmp;
  442. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  443. return tmp;
  444. }
  445. /* update timeout watcher */
  446. client->time_of_last_send = MQTT_PAL_TIME();
  447. msg->time_sent = client->time_of_last_send;
  448. /*
  449. Determine the state to put the message in.
  450. Control Types:
  451. MQTT_CONTROL_CONNECT -> awaiting
  452. MQTT_CONTROL_CONNACK -> n/a
  453. MQTT_CONTROL_PUBLISH -> qos == 0 ? complete : awaiting
  454. MQTT_CONTROL_PUBACK -> complete
  455. MQTT_CONTROL_PUBREC -> awaiting
  456. MQTT_CONTROL_PUBREL -> awaiting
  457. MQTT_CONTROL_PUBCOMP -> complete
  458. MQTT_CONTROL_SUBSCRIBE -> awaiting
  459. MQTT_CONTROL_SUBACK -> n/a
  460. MQTT_CONTROL_UNSUBSCRIBE -> awaiting
  461. MQTT_CONTROL_UNSUBACK -> n/a
  462. MQTT_CONTROL_PINGREQ -> awaiting
  463. MQTT_CONTROL_PINGRESP -> n/a
  464. MQTT_CONTROL_DISCONNECT -> complete
  465. */
  466. switch (msg->control_type) {
  467. case MQTT_CONTROL_PUBACK:
  468. case MQTT_CONTROL_PUBCOMP:
  469. case MQTT_CONTROL_DISCONNECT:
  470. msg->state = MQTT_QUEUED_COMPLETE;
  471. break;
  472. case MQTT_CONTROL_PUBLISH:
  473. inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
  474. if (inspected == 0) {
  475. msg->state = MQTT_QUEUED_COMPLETE;
  476. } else if (inspected == 1) {
  477. msg->state = MQTT_QUEUED_AWAITING_ACK;
  478. /*set DUP flag for subsequent sends */
  479. msg->start[1] |= MQTT_PUBLISH_DUP;
  480. } else {
  481. msg->state = MQTT_QUEUED_AWAITING_ACK;
  482. }
  483. break;
  484. case MQTT_CONTROL_CONNECT:
  485. case MQTT_CONTROL_PUBREC:
  486. case MQTT_CONTROL_PUBREL:
  487. case MQTT_CONTROL_SUBSCRIBE:
  488. case MQTT_CONTROL_UNSUBSCRIBE:
  489. case MQTT_CONTROL_PINGREQ:
  490. msg->state = MQTT_QUEUED_AWAITING_ACK;
  491. break;
  492. default:
  493. client->error = MQTT_ERROR_MALFORMED_REQUEST;
  494. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  495. return MQTT_ERROR_MALFORMED_REQUEST;
  496. }
  497. }
  498. /* check for keep-alive */
  499. mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t) ((float) (client->keep_alive) * 0.75);
  500. if (MQTT_PAL_TIME() > keep_alive_timeout) {
  501. ssize_t rv = __mqtt_ping(client);
  502. if (rv != MQTT_OK) {
  503. client->error = (enum MQTTErrors)rv;
  504. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  505. return rv;
  506. }
  507. }
  508. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  509. return MQTT_OK;
  510. }
  511. ssize_t __mqtt_recv(struct mqtt_client *client)
  512. {
  513. MQTT_PAL_MUTEX_LOCK(&client->mutex);
  514. struct mqtt_response response;
  515. /* read until there is nothing left to read */
  516. while(1) {
  517. /* read in as many bytes as possible */
  518. ssize_t rv, consumed;
  519. rv = mqtt_pal_recvall(client->socketfd, client->recv_buffer.curr, client->recv_buffer.curr_sz, 0);
  520. if (rv < 0) {
  521. /* an error occurred */
  522. client->error = (enum MQTTErrors)rv;
  523. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  524. return rv;
  525. } else {
  526. client->recv_buffer.curr += rv;
  527. client->recv_buffer.curr_sz -= (size_t)rv;
  528. }
  529. /* attempt to parse */
  530. consumed = mqtt_unpack_response(&response, client->recv_buffer.mem_start, (size_t)(client->recv_buffer.curr - client->recv_buffer.mem_start));
  531. if (consumed < 0) {
  532. client->error = (enum MQTTErrors)consumed;
  533. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  534. return consumed;
  535. } else if (consumed == 0) {
  536. /* if curr_sz is 0 then the buffer is too small to ever fit the message */
  537. if (client->recv_buffer.curr_sz == 0) {
  538. client->error = MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
  539. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  540. return MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
  541. }
  542. /* just need to wait for the rest of the data */
  543. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  544. return MQTT_OK;
  545. }
  546. /* response was unpacked successfully */
  547. struct mqtt_queued_message *msg = NULL;
  548. /*
  549. The switch statement below manages how the client responds to messages from the broker.
  550. Control Types (that we expect to receive from the broker):
  551. MQTT_CONTROL_CONNACK:
  552. -> release associated CONNECT
  553. -> handle response
  554. MQTT_CONTROL_PUBLISH:
  555. -> stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2
  556. -> call publish callback
  557. MQTT_CONTROL_PUBACK:
  558. -> release associated PUBLISH
  559. MQTT_CONTROL_PUBREC:
  560. -> release PUBLISH
  561. -> stage PUBREL
  562. MQTT_CONTROL_PUBREL:
  563. -> release associated PUBREC
  564. -> stage PUBCOMP
  565. MQTT_CONTROL_PUBCOMP:
  566. -> release PUBREL
  567. MQTT_CONTROL_SUBACK:
  568. -> release SUBSCRIBE
  569. -> handle response
  570. MQTT_CONTROL_UNSUBACK:
  571. -> release UNSUBSCRIBE
  572. MQTT_CONTROL_PINGRESP:
  573. -> release PINGREQ
  574. */
  575. switch (response.fixed_header.control_type) {
  576. case MQTT_CONTROL_CONNACK:
  577. /* release associated CONNECT */
  578. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_CONNECT, NULL);
  579. if (msg == NULL) {
  580. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  581. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  582. return MQTT_ERROR_ACK_OF_UNKNOWN;
  583. }
  584. msg->state = MQTT_QUEUED_COMPLETE;
  585. /* initialize typical response time */
  586. client->typical_response_time = (double) (MQTT_PAL_TIME() - msg->time_sent);
  587. /* check that connection was successful */
  588. if (response.decoded.connack.return_code != MQTT_CONNACK_ACCEPTED) {
  589. client->error = MQTT_ERROR_CONNECTION_REFUSED;
  590. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  591. return MQTT_ERROR_CONNECTION_REFUSED;
  592. }
  593. break;
  594. case MQTT_CONTROL_PUBLISH:
  595. /* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */
  596. if (response.decoded.publish.qos_level == 1) {
  597. rv = __mqtt_puback(client, response.decoded.publish.packet_id);
  598. if (rv != MQTT_OK) {
  599. client->error = (enum MQTTErrors)rv;
  600. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  601. return rv;
  602. }
  603. } else if (response.decoded.publish.qos_level == 2) {
  604. /* check if this is a duplicate */
  605. if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.publish.packet_id) != NULL) {
  606. break;
  607. }
  608. rv = __mqtt_pubrec(client, response.decoded.publish.packet_id);
  609. if (rv != MQTT_OK) {
  610. client->error = (enum MQTTErrors)rv;
  611. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  612. return rv;
  613. }
  614. }
  615. /* call publish callback */
  616. client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish);
  617. break;
  618. case MQTT_CONTROL_PUBACK:
  619. /* release associated PUBLISH */
  620. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.puback.packet_id);
  621. if (msg == NULL) {
  622. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  623. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  624. return MQTT_ERROR_ACK_OF_UNKNOWN;
  625. }
  626. msg->state = MQTT_QUEUED_COMPLETE;
  627. /* update response time */
  628. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  629. break;
  630. case MQTT_CONTROL_PUBREC:
  631. /* check if this is a duplicate */
  632. if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubrec.packet_id) != NULL) {
  633. break;
  634. }
  635. /* release associated PUBLISH */
  636. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.pubrec.packet_id);
  637. if (msg == NULL) {
  638. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  639. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  640. return MQTT_ERROR_ACK_OF_UNKNOWN;
  641. }
  642. msg->state = MQTT_QUEUED_COMPLETE;
  643. /* update response time */
  644. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  645. /* stage PUBREL */
  646. rv = __mqtt_pubrel(client, response.decoded.pubrec.packet_id);
  647. if (rv != MQTT_OK) {
  648. client->error = (enum MQTTErrors)rv;
  649. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  650. return rv;
  651. }
  652. break;
  653. case MQTT_CONTROL_PUBREL:
  654. /* release associated PUBREC */
  655. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.pubrel.packet_id);
  656. if (msg == NULL) {
  657. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  658. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  659. return MQTT_ERROR_ACK_OF_UNKNOWN;
  660. }
  661. msg->state = MQTT_QUEUED_COMPLETE;
  662. /* update response time */
  663. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  664. /* stage PUBCOMP */
  665. rv = __mqtt_pubcomp(client, response.decoded.pubrec.packet_id);
  666. if (rv != MQTT_OK) {
  667. client->error = (enum MQTTErrors)rv;
  668. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  669. return rv;
  670. }
  671. break;
  672. case MQTT_CONTROL_PUBCOMP:
  673. /* release associated PUBREL */
  674. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubcomp.packet_id);
  675. if (msg == NULL) {
  676. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  677. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  678. return MQTT_ERROR_ACK_OF_UNKNOWN;
  679. }
  680. msg->state = MQTT_QUEUED_COMPLETE;
  681. /* update response time */
  682. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  683. break;
  684. case MQTT_CONTROL_SUBACK:
  685. /* release associated SUBSCRIBE */
  686. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_SUBSCRIBE, &response.decoded.suback.packet_id);
  687. if (msg == NULL) {
  688. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  689. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  690. return MQTT_ERROR_ACK_OF_UNKNOWN;
  691. }
  692. msg->state = MQTT_QUEUED_COMPLETE;
  693. /* update response time */
  694. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  695. /* check that subscription was successful (not currently only one subscribe at a time) */
  696. if (response.decoded.suback.return_codes[0] == MQTT_SUBACK_FAILURE) {
  697. client->error = MQTT_ERROR_SUBSCRIBE_FAILED;
  698. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  699. return MQTT_ERROR_SUBSCRIBE_FAILED;
  700. }
  701. break;
  702. case MQTT_CONTROL_UNSUBACK:
  703. /* release associated UNSUBSCRIBE */
  704. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_UNSUBSCRIBE, &response.decoded.unsuback.packet_id);
  705. if (msg == NULL) {
  706. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  707. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  708. return MQTT_ERROR_ACK_OF_UNKNOWN;
  709. }
  710. msg->state = MQTT_QUEUED_COMPLETE;
  711. /* update response time */
  712. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  713. break;
  714. case MQTT_CONTROL_PINGRESP:
  715. /* release associated PINGREQ */
  716. msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PINGREQ, NULL);
  717. if (msg == NULL) {
  718. client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
  719. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  720. return MQTT_ERROR_ACK_OF_UNKNOWN;
  721. }
  722. msg->state = MQTT_QUEUED_COMPLETE;
  723. /* update response time */
  724. client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
  725. break;
  726. default:
  727. client->error = MQTT_ERROR_MALFORMED_RESPONSE;
  728. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  729. return MQTT_ERROR_MALFORMED_RESPONSE;
  730. }
  731. /* we've handled the response, now clean the buffer */
  732. void *dest = client->recv_buffer.mem_start;
  733. void *src = client->recv_buffer.mem_start + consumed;
  734. size_t n = (size_t)(client->recv_buffer.curr - client->recv_buffer.mem_start - consumed);
  735. memmove(dest, src, n);
  736. client->recv_buffer.curr -= consumed;
  737. client->recv_buffer.curr_sz += (size_t)consumed;
  738. }
  739. /* never hit (always return once there's nothing left. */
  740. MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
  741. return MQTT_OK;
  742. }
  743. /* FIXED HEADER */
  744. #define MQTT_BITFIELD_RULE_VIOLOATION(bitfield, rule_value, rule_mask) ((bitfield ^ rule_value) & rule_mask)
  745. struct mqtt_fixed_header_rules_s{
  746. const uint8_t control_type_is_valid[16];
  747. const uint8_t required_flags[16];
  748. const uint8_t mask_required_flags[16];
  749. } ;
  750. struct mqtt_fixed_header_rules_s mqtt_fixed_header_rules =
  751. {
  752. { /* boolean value, true if type is valid */
  753. 0x00, /* MQTT_CONTROL_RESERVED */
  754. 0x01, /* MQTT_CONTROL_CONNECT */
  755. 0x01, /* MQTT_CONTROL_CONNACK */
  756. 0x01, /* MQTT_CONTROL_PUBLISH */
  757. 0x01, /* MQTT_CONTROL_PUBACK */
  758. 0x01, /* MQTT_CONTROL_PUBREC */
  759. 0x01, /* MQTT_CONTROL_PUBREL */
  760. 0x01, /* MQTT_CONTROL_PUBCOMP */
  761. 0x01, /* MQTT_CONTROL_SUBSCRIBE */
  762. 0x01, /* MQTT_CONTROL_SUBACK */
  763. 0x01, /* MQTT_CONTROL_UNSUBSCRIBE */
  764. 0x01, /* MQTT_CONTROL_UNSUBACK */
  765. 0x01, /* MQTT_CONTROL_PINGREQ */
  766. 0x01, /* MQTT_CONTROL_PINGRESP */
  767. 0x01, /* MQTT_CONTROL_DISCONNECT */
  768. 0x00 /* MQTT_CONTROL_RESERVED */
  769. },
  770. { /* flags that must be set for the associated control type */
  771. 0x00, /* MQTT_CONTROL_RESERVED */
  772. 0x00, /* MQTT_CONTROL_CONNECT */
  773. 0x00, /* MQTT_CONTROL_CONNACK */
  774. 0x00, /* MQTT_CONTROL_PUBLISH */
  775. 0x00, /* MQTT_CONTROL_PUBACK */
  776. 0x00, /* MQTT_CONTROL_PUBREC */
  777. 0x02, /* MQTT_CONTROL_PUBREL */
  778. 0x00, /* MQTT_CONTROL_PUBCOMP */
  779. 0x02, /* MQTT_CONTROL_SUBSCRIBE */
  780. 0x00, /* MQTT_CONTROL_SUBACK */
  781. 0x02, /* MQTT_CONTROL_UNSUBSCRIBE */
  782. 0x00, /* MQTT_CONTROL_UNSUBACK */
  783. 0x00, /* MQTT_CONTROL_PINGREQ */
  784. 0x00, /* MQTT_CONTROL_PINGRESP */
  785. 0x00, /* MQTT_CONTROL_DISCONNECT */
  786. 0x00 /* MQTT_CONTROL_RESERVED */
  787. },
  788. { /* mask of flags that must be specific values for the associated control type*/
  789. 0x00, /* MQTT_CONTROL_RESERVED */
  790. 0x0F, /* MQTT_CONTROL_CONNECT */
  791. 0x0F, /* MQTT_CONTROL_CONNACK */
  792. 0x00, /* MQTT_CONTROL_PUBLISH */
  793. 0x0F, /* MQTT_CONTROL_PUBACK */
  794. 0x0F, /* MQTT_CONTROL_PUBREC */
  795. 0x0F, /* MQTT_CONTROL_PUBREL */
  796. 0x0F, /* MQTT_CONTROL_PUBCOMP */
  797. 0x0F, /* MQTT_CONTROL_SUBSCRIBE */
  798. 0x0F, /* MQTT_CONTROL_SUBACK */
  799. 0x0F, /* MQTT_CONTROL_UNSUBSCRIBE */
  800. 0x0F, /* MQTT_CONTROL_UNSUBACK */
  801. 0x0F, /* MQTT_CONTROL_PINGREQ */
  802. 0x0F, /* MQTT_CONTROL_PINGRESP */
  803. 0x0F, /* MQTT_CONTROL_DISCONNECT */
  804. 0x00 /* MQTT_CONTROL_RESERVED */
  805. }
  806. };
  807. static ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) {
  808. uint8_t control_type;
  809. uint8_t control_flags;
  810. uint8_t required_flags;
  811. uint8_t mask_required_flags;
  812. /* get value and rules */
  813. control_type = (uint8_t)fixed_header->control_type;
  814. control_flags = fixed_header->control_flags;
  815. required_flags = mqtt_fixed_header_rules.required_flags[control_type];
  816. mask_required_flags = mqtt_fixed_header_rules.mask_required_flags[control_type];
  817. /* check for valid type */
  818. if (!mqtt_fixed_header_rules.control_type_is_valid[control_type]) {
  819. return MQTT_ERROR_CONTROL_FORBIDDEN_TYPE;
  820. }
  821. /* check that flags are appropriate */
  822. if(MQTT_BITFIELD_RULE_VIOLOATION(control_flags, required_flags, mask_required_flags)) {
  823. return MQTT_ERROR_CONTROL_INVALID_FLAGS;
  824. }
  825. return 0;
  826. }
  827. ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz) {
  828. struct mqtt_fixed_header *fixed_header;
  829. const uint8_t *start = buf;
  830. int lshift;
  831. ssize_t errcode;
  832. /* check for null pointers or empty buffer */
  833. if (response == NULL || buf == NULL) {
  834. return MQTT_ERROR_NULLPTR;
  835. }
  836. fixed_header = &(response->fixed_header);
  837. /* check that bufsz is not zero */
  838. if (bufsz == 0) return 0;
  839. /* parse control type and flags */
  840. fixed_header->control_type = (enum MQTTControlPacketType)(*buf >> 4);
  841. fixed_header->control_flags = (uint8_t)(*buf & 0x0F);
  842. /* parse remaining size */
  843. fixed_header->remaining_length = 0;
  844. lshift = 0;
  845. do {
  846. /* consume byte and assert at least 1 byte left */
  847. --bufsz;
  848. ++buf;
  849. if (bufsz == 0) return 0;
  850. /* parse next byte*/
  851. fixed_header->remaining_length += (uint32_t)((*buf & 0x7F) << lshift);
  852. lshift += 7;
  853. } while(*buf & 0x80); /* while continue bit is set */
  854. /* consume last byte */
  855. --bufsz;
  856. ++buf;
  857. /* check that the fixed header is valid */
  858. errcode = mqtt_fixed_header_rule_violation(fixed_header);
  859. if (errcode) {
  860. return errcode;
  861. }
  862. /* check that the buffer size if GT remaining length */
  863. if (bufsz < fixed_header->remaining_length) {
  864. return 0;
  865. }
  866. /* return how many bytes were consumed */
  867. return buf - start;
  868. }
  869. ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header) {
  870. const uint8_t *start = buf;
  871. ssize_t errcode;
  872. uint32_t remaining_length;
  873. /* check for null pointers or empty buffer */
  874. if (fixed_header == NULL || buf == NULL) {
  875. return MQTT_ERROR_NULLPTR;
  876. }
  877. /* check that the fixed header is valid */
  878. errcode = mqtt_fixed_header_rule_violation(fixed_header);
  879. if (errcode) {
  880. return errcode;
  881. }
  882. /* check that bufsz is not zero */
  883. if (bufsz == 0) return 0;
  884. /* pack control type and flags */
  885. *buf = (uint8_t)((((uint8_t) fixed_header->control_type) << 4) & 0xF0);
  886. *buf = (uint8_t)(*buf | (((uint8_t) fixed_header->control_flags) & 0x0F));
  887. remaining_length = fixed_header->remaining_length;
  888. do {
  889. /* consume byte and assert at least 1 byte left */
  890. --bufsz;
  891. ++buf;
  892. if (bufsz == 0) return 0;
  893. /* pack next byte */
  894. *buf = remaining_length & 0x7F;
  895. if(remaining_length > 127) *buf |= 0x80;
  896. remaining_length = remaining_length >> 7;
  897. } while(*buf & 0x80);
  898. /* consume last byte */
  899. --bufsz;
  900. ++buf;
  901. /* check that there's still enough space in buffer for packet */
  902. if (bufsz < fixed_header->remaining_length) {
  903. return 0;
  904. }
  905. /* return how many bytes were consumed */
  906. return buf - start;
  907. }
  908. /* CONNECT */
  909. ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
  910. const char* client_id,
  911. const char* will_topic,
  912. const void* will_message,
  913. size_t will_message_size,
  914. const char* user_name,
  915. const char* password,
  916. uint8_t connect_flags,
  917. uint16_t keep_alive)
  918. {
  919. struct mqtt_fixed_header fixed_header;
  920. uint32_t remaining_length;
  921. //const uint8_t const* start = buf;
  922. const uint8_t * start = buf;
  923. ssize_t rv;
  924. uint8_t temp;
  925. /* pack the fixed headr */
  926. fixed_header.control_type = MQTT_CONTROL_CONNECT;
  927. fixed_header.control_flags = 0x00;
  928. /* calculate remaining length and build connect_flags at the same time */
  929. connect_flags = (uint8_t)(connect_flags & ~MQTT_CONNECT_RESERVED);
  930. remaining_length = 10; /* size of variable header */
  931. if (client_id == NULL) {
  932. /* client_id is a mandatory parameter */
  933. return MQTT_ERROR_CONNECT_NULL_CLIENT_ID;
  934. } else {
  935. /* mqtt_string length is strlen + 2 */
  936. remaining_length += (uint32_t)__mqtt_packed_cstrlen(client_id);
  937. }
  938. if (will_topic != NULL) {
  939. /* there is a will */
  940. connect_flags |= MQTT_CONNECT_WILL_FLAG;
  941. remaining_length += (uint32_t)__mqtt_packed_cstrlen(will_topic);
  942. if (will_message == NULL) {
  943. /* if there's a will there MUST be a will message */
  944. return MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE;
  945. }
  946. remaining_length += (uint32_t)(2 + will_message_size); /* size of will_message */
  947. /* assert that the will QOS is valid (i.e. not 3) */
  948. temp = connect_flags & 0x18; /* mask to QOS */
  949. if (temp == 0x18) {
  950. /* bitwise equality with QoS 3 (invalid)*/
  951. return MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS;
  952. }
  953. } else {
  954. /* there is no will so set all will flags to zero */
  955. connect_flags &= (uint8_t)~MQTT_CONNECT_WILL_FLAG;
  956. connect_flags &= (uint8_t)~0x18;
  957. connect_flags &= (uint8_t)~MQTT_CONNECT_WILL_RETAIN;
  958. }
  959. if (user_name != NULL) {
  960. /* a user name is present */
  961. connect_flags |= MQTT_CONNECT_USER_NAME;
  962. remaining_length += (uint32_t)__mqtt_packed_cstrlen(user_name);
  963. } else {
  964. connect_flags &= (uint8_t)~MQTT_CONNECT_USER_NAME;
  965. }
  966. if (password != NULL) {
  967. /* a password is present */
  968. connect_flags |= MQTT_CONNECT_PASSWORD;
  969. remaining_length += (uint32_t)__mqtt_packed_cstrlen(password);
  970. } else {
  971. connect_flags &= (uint8_t)~MQTT_CONNECT_PASSWORD;
  972. }
  973. /* fixed header length is now calculated*/
  974. fixed_header.remaining_length = remaining_length;
  975. /* pack fixed header and perform error checks */
  976. rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  977. if (rv <= 0) {
  978. /* something went wrong */
  979. return rv;
  980. }
  981. buf += rv;
  982. bufsz -= (size_t)rv;
  983. /* check that the buffer has enough space to fit the remaining length */
  984. if (bufsz < fixed_header.remaining_length) {
  985. return 0;
  986. }
  987. /* pack the variable header */
  988. *buf++ = 0x00;
  989. *buf++ = 0x04;
  990. *buf++ = (uint8_t) 'M';
  991. *buf++ = (uint8_t) 'Q';
  992. *buf++ = (uint8_t) 'T';
  993. *buf++ = (uint8_t) 'T';
  994. *buf++ = MQTT_PROTOCOL_LEVEL;
  995. *buf++ = connect_flags;
  996. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(keep_alive);
  997. buf += 2;
  998. /* pack the payload */
  999. buf += __mqtt_pack_str(buf, client_id);
  1000. if (connect_flags & MQTT_CONNECT_WILL_FLAG) {
  1001. buf += __mqtt_pack_str(buf, will_topic);
  1002. memcpy(buf, will_message, will_message_size);
  1003. buf += will_message_size;
  1004. }
  1005. if (connect_flags & MQTT_CONNECT_USER_NAME) {
  1006. buf += __mqtt_pack_str(buf, user_name);
  1007. }
  1008. if (connect_flags & MQTT_CONNECT_PASSWORD) {
  1009. buf += __mqtt_pack_str(buf, password);
  1010. }
  1011. /* return the number of bytes that were consumed */
  1012. return buf - start;
  1013. }
  1014. /* CONNACK */
  1015. ssize_t mqtt_unpack_connack_response(struct mqtt_response *mqtt_response, const uint8_t *buf) {
  1016. const uint8_t *const start = buf;
  1017. struct mqtt_response_connack *response;
  1018. /* check that remaining length is 2 */
  1019. if (mqtt_response->fixed_header.remaining_length != 2) {
  1020. return MQTT_ERROR_MALFORMED_RESPONSE;
  1021. }
  1022. response = &(mqtt_response->decoded.connack);
  1023. /* unpack */
  1024. if (*buf & 0xFE) {
  1025. /* only bit 1 can be set */
  1026. return MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS;
  1027. } else {
  1028. response->session_present_flag = *buf++;
  1029. }
  1030. if (*buf > 5u) {
  1031. /* only bit 1 can be set */
  1032. return MQTT_ERROR_CONNACK_FORBIDDEN_CODE;
  1033. } else {
  1034. response->return_code = (enum MQTTConnackReturnCode) *buf++;
  1035. }
  1036. return buf - start;
  1037. }
  1038. /* DISCONNECT */
  1039. ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz) {
  1040. struct mqtt_fixed_header fixed_header;
  1041. fixed_header.control_type = MQTT_CONTROL_DISCONNECT;
  1042. fixed_header.control_flags = 0;
  1043. fixed_header.remaining_length = 0;
  1044. return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1045. }
  1046. /* PING */
  1047. ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz) {
  1048. struct mqtt_fixed_header fixed_header;
  1049. fixed_header.control_type = MQTT_CONTROL_PINGREQ;
  1050. fixed_header.control_flags = 0;
  1051. fixed_header.remaining_length = 0;
  1052. return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1053. }
  1054. /* PUBLISH */
  1055. ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
  1056. const char* topic_name,
  1057. uint16_t packet_id,
  1058. void* application_message,
  1059. size_t application_message_size,
  1060. uint8_t publish_flags)
  1061. {
  1062. const uint8_t *const start = buf;
  1063. ssize_t rv;
  1064. struct mqtt_fixed_header fixed_header;
  1065. uint16_t remaining_length;
  1066. uint8_t inspected_qos;
  1067. /* check for null pointers */
  1068. if(buf == NULL || topic_name == NULL) {
  1069. return MQTT_ERROR_NULLPTR;
  1070. }
  1071. /* inspect QoS level */
  1072. inspected_qos = (uint8_t)((publish_flags & 0x06) >> 1); /* mask */
  1073. /* build the fixed header */
  1074. fixed_header.control_type = MQTT_CONTROL_PUBLISH;
  1075. /* calculate remaining length */
  1076. remaining_length = (uint16_t)__mqtt_packed_cstrlen(topic_name);
  1077. if (inspected_qos > 0) {
  1078. remaining_length = (uint16_t)(remaining_length + 2);
  1079. }
  1080. remaining_length = (uint16_t) (remaining_length + application_message_size);
  1081. fixed_header.remaining_length = remaining_length;
  1082. /* force dup to 0 if qos is 0 */
  1083. if (inspected_qos == 0) {
  1084. publish_flags &= (uint8_t)~MQTT_PUBLISH_DUP;
  1085. }
  1086. /* make sure that qos is not 3 */
  1087. if (inspected_qos == 3) {
  1088. return MQTT_ERROR_PUBLISH_FORBIDDEN_QOS;
  1089. }
  1090. fixed_header.control_flags = publish_flags & 0x7; //@@@
  1091. /* pack fixed header */
  1092. rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1093. if (rv <= 0) {
  1094. /* something went wrong */
  1095. return rv;
  1096. }
  1097. buf += rv;
  1098. bufsz -= (size_t)rv;
  1099. /* check that buffer is big enough */
  1100. if (bufsz < remaining_length) {
  1101. return 0;
  1102. }
  1103. /* pack variable header */
  1104. buf += __mqtt_pack_str(buf, topic_name);
  1105. if (inspected_qos > 0) {
  1106. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
  1107. buf += 2;
  1108. }
  1109. /* pack payload */
  1110. memcpy(buf, application_message, application_message_size);
  1111. buf += application_message_size;
  1112. return buf - start;
  1113. }
  1114. ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
  1115. {
  1116. //const uint8_t const *start = buf;
  1117. const uint8_t *start = buf;
  1118. struct mqtt_fixed_header *fixed_header;
  1119. struct mqtt_response_publish *response;
  1120. fixed_header = &(mqtt_response->fixed_header);
  1121. response = &(mqtt_response->decoded.publish);
  1122. /* get flags */
  1123. response->dup_flag = (uint8_t)((fixed_header->control_flags & MQTT_PUBLISH_DUP) >> 3);
  1124. response->qos_level = (uint8_t)((fixed_header->control_flags & 0x06) >> 1);
  1125. response->retain_flag = fixed_header->control_flags & MQTT_PUBLISH_RETAIN;
  1126. /* make sure that remaining length is valid */
  1127. if (mqtt_response->fixed_header.remaining_length < 4) {
  1128. return MQTT_ERROR_MALFORMED_RESPONSE;
  1129. }
  1130. /* parse variable header */
  1131. response->topic_name_size = (uint16_t) MQTT_PAL_NTOHS(*(const uint16_t*) buf);
  1132. buf += 2;
  1133. response->topic_name = buf;
  1134. buf += response->topic_name_size;
  1135. if (response->qos_level > 0) {
  1136. response->packet_id = (uint16_t) MQTT_PAL_NTOHS(*(const uint16_t*) buf);
  1137. buf += 2;
  1138. }
  1139. /* get payload */
  1140. response->application_message = buf;
  1141. if (response->qos_level == 0) {
  1142. response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 2;
  1143. } else {
  1144. response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 4;
  1145. }
  1146. buf += response->application_message_size;
  1147. /* return number of bytes consumed */
  1148. return buf - start;
  1149. }
  1150. /* PUBXXX */
  1151. ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
  1152. enum MQTTControlPacketType control_type,
  1153. uint16_t packet_id)
  1154. {
  1155. const uint8_t *const start = buf;
  1156. struct mqtt_fixed_header fixed_header;
  1157. ssize_t rv;
  1158. if (buf == NULL) {
  1159. return MQTT_ERROR_NULLPTR;
  1160. }
  1161. /* pack fixed header */
  1162. fixed_header.control_type = control_type;
  1163. if (control_type == MQTT_CONTROL_PUBREL) {
  1164. fixed_header.control_flags = 0x02;
  1165. } else {
  1166. fixed_header.control_flags = 0;
  1167. }
  1168. fixed_header.remaining_length = 2;
  1169. rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1170. if (rv <= 0) {
  1171. return rv;
  1172. }
  1173. buf += rv;
  1174. bufsz -= (size_t)rv;
  1175. if (bufsz < fixed_header.remaining_length) {
  1176. return 0;
  1177. }
  1178. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
  1179. buf += 2;
  1180. return buf - start;
  1181. }
  1182. ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
  1183. {
  1184. const uint8_t *const start = buf;
  1185. uint16_t packet_id;
  1186. /* assert remaining length is correct */
  1187. if (mqtt_response->fixed_header.remaining_length != 2) {
  1188. return MQTT_ERROR_MALFORMED_RESPONSE;
  1189. }
  1190. /* parse packet_id */
  1191. packet_id = (uint16_t) MQTT_PAL_NTOHS(*(const uint16_t*) buf);
  1192. buf += 2;
  1193. if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBACK) {
  1194. mqtt_response->decoded.puback.packet_id = packet_id;
  1195. } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREC) {
  1196. mqtt_response->decoded.pubrec.packet_id = packet_id;
  1197. } else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREL) {
  1198. mqtt_response->decoded.pubrel.packet_id = packet_id;
  1199. } else {
  1200. mqtt_response->decoded.pubcomp.packet_id = packet_id;
  1201. }
  1202. return buf - start;
  1203. }
  1204. /* SUBACK */
  1205. ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const uint8_t *buf) {
  1206. const uint8_t *const start = buf;
  1207. uint32_t remaining_length = mqtt_response->fixed_header.remaining_length;
  1208. /* assert remaining length is at least 3 (for packet id and at least 1 topic) */
  1209. if (remaining_length < 3) {
  1210. return MQTT_ERROR_MALFORMED_RESPONSE;
  1211. }
  1212. /* unpack packet_id */
  1213. mqtt_response->decoded.suback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(const uint16_t*) buf);
  1214. buf += 2;
  1215. remaining_length -= 2;
  1216. /* unpack return codes */
  1217. mqtt_response->decoded.suback.num_return_codes = (size_t) remaining_length;
  1218. mqtt_response->decoded.suback.return_codes = buf;
  1219. buf += remaining_length;
  1220. return buf - start;
  1221. }
  1222. /* TODO: currently suppressing:
  1223. * passing an object that undergoes default argument promotion to 'va_start' has undefined behavior [-Werror,-Wvarargs]
  1224. * va_start(args, (uint16_t)packet_id);
  1225. * ^
  1226. */
  1227. #pragma GCC diagnostic push
  1228. #pragma GCC diagnostic ignored "-Wvarargs"
  1229. /* SUBSCRIBE */
  1230. ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) {
  1231. va_list args;
  1232. const uint8_t *const start = buf;
  1233. ssize_t rv;
  1234. struct mqtt_fixed_header fixed_header;
  1235. unsigned int num_subs = 0;
  1236. unsigned int i;
  1237. const char *topic[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
  1238. uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
  1239. /* parse all subscriptions */
  1240. va_start(args, (uint16_t)packet_id);
  1241. while(1) {
  1242. topic[num_subs] = va_arg(args, const char*);
  1243. if (topic[num_subs] == NULL) {
  1244. /* end of list */
  1245. break;
  1246. }
  1247. max_qos[num_subs] = (uint8_t) va_arg(args, unsigned int);
  1248. ++num_subs;
  1249. if (num_subs >= MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
  1250. return MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS;
  1251. }
  1252. }
  1253. va_end(args);
  1254. /* build the fixed header */
  1255. fixed_header.control_type = MQTT_CONTROL_SUBSCRIBE;
  1256. fixed_header.control_flags = 2u;
  1257. fixed_header.remaining_length = 2u; /* size of variable header */
  1258. for(i = 0; i < num_subs; ++i) {
  1259. /* payload is topic name + max qos (1 byte) */
  1260. fixed_header.remaining_length += (uint32_t )(__mqtt_packed_cstrlen(topic[i]) + 1);
  1261. }
  1262. /* pack the fixed header */
  1263. rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1264. if (rv <= 0) {
  1265. return rv;
  1266. }
  1267. buf += rv;
  1268. bufsz -= (size_t)rv;
  1269. /* check that the buffer has enough space */
  1270. if (bufsz < fixed_header.remaining_length) {
  1271. return 0;
  1272. }
  1273. /* pack variable header */
  1274. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
  1275. buf += 2;
  1276. /* pack payload */
  1277. for(i = 0; i < num_subs; ++i) {
  1278. buf += __mqtt_pack_str(buf, topic[i]);
  1279. *buf++ = max_qos[i];
  1280. }
  1281. return buf - start;
  1282. }
  1283. /* UNSUBACK */
  1284. ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
  1285. {
  1286. const uint8_t *const start = buf;
  1287. if (mqtt_response->fixed_header.remaining_length != 2) {
  1288. return MQTT_ERROR_MALFORMED_RESPONSE;
  1289. }
  1290. /* parse packet_id */
  1291. mqtt_response->decoded.unsuback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(const uint16_t*) buf);
  1292. buf += 2;
  1293. return buf - start;
  1294. }
  1295. /* UNSUBSCRIBE */
  1296. ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) {
  1297. va_list args;
  1298. const uint8_t *const start = buf;
  1299. ssize_t rv;
  1300. struct mqtt_fixed_header fixed_header;
  1301. unsigned int num_subs = 0;
  1302. unsigned int i;
  1303. const char *topic[MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
  1304. /* parse all subscriptions */
  1305. va_start(args, (uint16_t)packet_id);
  1306. while(1) {
  1307. topic[num_subs] = va_arg(args, const char*);
  1308. if (topic[num_subs] == NULL) {
  1309. /* end of list */
  1310. break;
  1311. }
  1312. ++num_subs;
  1313. if (num_subs >= MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
  1314. return MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS;
  1315. }
  1316. }
  1317. va_end(args);
  1318. /* build the fixed header */
  1319. fixed_header.control_type = MQTT_CONTROL_UNSUBSCRIBE;
  1320. fixed_header.control_flags = 2u;
  1321. fixed_header.remaining_length = 2u; /* size of variable header */
  1322. for(i = 0; i < num_subs; ++i) {
  1323. /* payload is topic name */
  1324. fixed_header.remaining_length += (uint32_t)(__mqtt_packed_cstrlen(topic[i]));
  1325. }
  1326. /* pack the fixed header */
  1327. rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
  1328. if (rv <= 0) {
  1329. return rv;
  1330. }
  1331. buf += rv;
  1332. bufsz -= (size_t)rv;
  1333. /* check that the buffer has enough space */
  1334. if (bufsz < fixed_header.remaining_length) {
  1335. return 0;
  1336. }
  1337. /* pack variable header */
  1338. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id);
  1339. buf += 2;
  1340. /* pack payload */
  1341. for(i = 0; i < num_subs; ++i) {
  1342. buf += __mqtt_pack_str(buf, topic[i]);
  1343. }
  1344. return buf - start;
  1345. }
  1346. #pragma GCC diagnostic pop
  1347. /* MESSAGE QUEUE */
  1348. void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz)
  1349. {
  1350. mq->mem_start = buf;
  1351. mq->mem_end = (uint8_t *)buf + bufsz;
  1352. mq->curr = (uint8_t *)buf;
  1353. mq->queue_tail = (struct mqtt_queued_message *)mq->mem_end;
  1354. mq->curr_sz = (size_t)(mqtt_mq_currsz(mq));
  1355. }
  1356. struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes)
  1357. {
  1358. /* make queued message header */
  1359. --(mq->queue_tail);
  1360. mq->queue_tail->start = mq->curr;
  1361. mq->queue_tail->size = nbytes;
  1362. mq->queue_tail->state = MQTT_QUEUED_UNSENT;
  1363. /* move curr and recalculate curr_sz */
  1364. mq->curr += nbytes;
  1365. mq->curr_sz = (size_t)(mqtt_mq_currsz(mq));
  1366. return mq->queue_tail;
  1367. }
  1368. void mqtt_mq_clean(struct mqtt_message_queue *mq) {
  1369. struct mqtt_queued_message *new_head;
  1370. for(new_head = mqtt_mq_get(mq, 0); new_head >= mq->queue_tail; --new_head) {
  1371. if (new_head->state != MQTT_QUEUED_COMPLETE) break;
  1372. }
  1373. /* check if everything can be removed */
  1374. if (new_head < mq->queue_tail) {
  1375. mq->curr = (uint8_t *)mq->mem_start;
  1376. mq->queue_tail = (struct mqtt_queued_message *)mq->mem_end;
  1377. mq->curr_sz = (size_t)(mqtt_mq_currsz(mq));
  1378. return;
  1379. } else if (new_head == mqtt_mq_get(mq, 0)) {
  1380. /* do nothing */
  1381. return;
  1382. }
  1383. /* move buffered data */
  1384. size_t n = (size_t)(mq->curr - new_head->start);
  1385. size_t removing = (size_t)(new_head->start - (uint8_t*) mq->mem_start);
  1386. memmove(mq->mem_start, new_head->start, n);
  1387. mq->curr = (uint8_t *)((uint8_t *)mq->mem_start + n); //@@@
  1388. /* move queue */
  1389. ssize_t new_tail_idx = new_head - mq->queue_tail;
  1390. memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, (sizeof(struct mqtt_queued_message) * (size_t)(new_tail_idx + 1)));
  1391. mq->queue_tail = mqtt_mq_get(mq, new_tail_idx);
  1392. /* bump back start's */
  1393. for(ssize_t i = 0; i < new_tail_idx + 1; ++i) {
  1394. mqtt_mq_get(mq, i)->start -= removing;
  1395. }
  1396. /* get curr_sz */
  1397. mq->curr_sz = (size_t)(mqtt_mq_currsz(mq));
  1398. }
  1399. struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id)
  1400. {
  1401. struct mqtt_queued_message *curr;
  1402. for(curr = mqtt_mq_get(mq, 0); curr >= mq->queue_tail; --curr) {
  1403. if (curr->control_type == control_type) {
  1404. if ((packet_id == NULL && curr->state != MQTT_QUEUED_COMPLETE) ||
  1405. (packet_id != NULL && *packet_id == curr->packet_id)) {
  1406. return curr;
  1407. }
  1408. }
  1409. }
  1410. return NULL;
  1411. }
  1412. /* RESPONSE UNPACKING */
  1413. ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz) {
  1414. const uint8_t *const start = buf;
  1415. ssize_t rv = mqtt_unpack_fixed_header(response, buf, bufsz);
  1416. if (rv <= 0) return rv;
  1417. else buf += rv;
  1418. switch(response->fixed_header.control_type) {
  1419. case MQTT_CONTROL_CONNACK:
  1420. rv = mqtt_unpack_connack_response(response, buf);
  1421. break;
  1422. case MQTT_CONTROL_PUBLISH:
  1423. rv = mqtt_unpack_publish_response(response, buf);
  1424. break;
  1425. case MQTT_CONTROL_PUBACK:
  1426. rv = mqtt_unpack_pubxxx_response(response, buf);
  1427. break;
  1428. case MQTT_CONTROL_PUBREC:
  1429. rv = mqtt_unpack_pubxxx_response(response, buf);
  1430. break;
  1431. case MQTT_CONTROL_PUBREL:
  1432. rv = mqtt_unpack_pubxxx_response(response, buf);
  1433. break;
  1434. case MQTT_CONTROL_PUBCOMP:
  1435. rv = mqtt_unpack_pubxxx_response(response, buf);
  1436. break;
  1437. case MQTT_CONTROL_SUBACK:
  1438. rv = mqtt_unpack_suback_response(response, buf);
  1439. break;
  1440. case MQTT_CONTROL_UNSUBACK:
  1441. rv = mqtt_unpack_unsuback_response(response, buf);
  1442. break;
  1443. case MQTT_CONTROL_PINGRESP:
  1444. return rv;
  1445. default:
  1446. return MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE;
  1447. }
  1448. if (rv < 0) return rv;
  1449. buf += rv;
  1450. return buf - start;
  1451. }
  1452. /* EXTRA DETAILS */
  1453. ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) {
  1454. uint16_t length = (uint16_t)strlen(str);
  1455. /* pack string length */
  1456. *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(length);
  1457. buf += 2;
  1458. /* pack string */
  1459. for(int i = 0; i < length; ++i) {
  1460. *(buf++) = (uint8_t)str[i];
  1461. }
  1462. /* return number of bytes consumed */
  1463. return length + 2;
  1464. }
  1465. static const char *MQTT_ERRORS_STR[] = {
  1466. "MQTT_UNKNOWN_ERROR",
  1467. __ALL_MQTT_ERRORS(GENERATE_STRING)
  1468. };
  1469. const char* mqtt_error_str(enum MQTTErrors error) {
  1470. int offset = error - MQTT_ERROR_UNKNOWN;
  1471. if (offset >= 0) {
  1472. return MQTT_ERRORS_STR[offset];
  1473. } else if (error == 0) {
  1474. return "MQTT_ERROR: Buffer too small.";
  1475. } else if (error > 0) {
  1476. return "MQTT_OK";
  1477. } else {
  1478. return MQTT_ERRORS_STR[0];
  1479. }
  1480. }
  1481. /** @endcond*/