1 /** 2 * @file 3 * MQTT client 4 * 5 * @defgroup mqtt MQTT client 6 * @ingroup apps 7 * @verbinclude mqtt_client.txt 8 */ 9 10 /* 11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 12 * All rights reserved. 13 * 14 * Redistribution and use in source and binary forms, with or without modification, 15 * are permitted provided that the following conditions are met: 16 * 17 * 1. Redistributions of source code must retain the above copyright notice, 18 * this list of conditions and the following disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright notice, 20 * this list of conditions and the following disclaimer in the documentation 21 * and/or other materials provided with the distribution. 22 * 3. The name of the author may not be used to endorse or promote products 23 * derived from this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 34 * OF SUCH DAMAGE. 35 * 36 * This file is part of the lwIP TCP/IP stack 37 * 38 * Author: Erik Andersson <erian747@gmail.com> 39 * 40 * 41 * @todo: 42 * - Handle large outgoing payloads for PUBLISH messages 43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 44 * - Add support for legacy MQTT protocol version 45 * 46 * Please coordinate changes and requests with Erik Andersson 47 * Erik Andersson <erian747@gmail.com> 48 * 49 */ 50 #include "lwip/apps/mqtt.h" 51 #include "lwip/apps/mqtt_priv.h" 52 #include "lwip/timeouts.h" 53 #include "lwip/ip_addr.h" 54 #include "lwip/mem.h" 55 #include "lwip/err.h" 56 #include "lwip/pbuf.h" 57 #include "lwip/altcp.h" 58 #include "lwip/altcp_tcp.h" 59 #include "lwip/altcp_tls.h" 60 #include <string.h> 61 62 #if LWIP_TCP && LWIP_CALLBACK_API 63 64 /** 65 * MQTT_DEBUG: Default is off. 66 */ 67 #if !defined MQTT_DEBUG || defined __DOXYGEN__ 68 #define MQTT_DEBUG LWIP_DBG_OFF 69 #endif 70 71 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 72 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 73 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 74 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 75 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 76 77 78 79 /** 80 * MQTT client connection states 81 */ 82 enum { 83 TCP_DISCONNECTED, 84 TCP_CONNECTING, 85 MQTT_CONNECTING, 86 MQTT_CONNECTED 87 }; 88 89 /** 90 * MQTT control message types 91 */ 92 enum mqtt_message_type { 93 MQTT_MSG_TYPE_CONNECT = 1, 94 MQTT_MSG_TYPE_CONNACK = 2, 95 MQTT_MSG_TYPE_PUBLISH = 3, 96 MQTT_MSG_TYPE_PUBACK = 4, 97 MQTT_MSG_TYPE_PUBREC = 5, 98 MQTT_MSG_TYPE_PUBREL = 6, 99 MQTT_MSG_TYPE_PUBCOMP = 7, 100 MQTT_MSG_TYPE_SUBSCRIBE = 8, 101 MQTT_MSG_TYPE_SUBACK = 9, 102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 103 MQTT_MSG_TYPE_UNSUBACK = 11, 104 MQTT_MSG_TYPE_PINGREQ = 12, 105 MQTT_MSG_TYPE_PINGRESP = 13, 106 MQTT_MSG_TYPE_DISCONNECT = 14 107 }; 108 109 /** Helpers to extract control packet type and qos from first byte in fixed header */ 110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 112 113 /** 114 * MQTT connect flags, only used in CONNECT message 115 */ 116 enum mqtt_connect_flag { 117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 120 MQTT_CONNECT_FLAG_WILL = 1 << 2, 121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 122 }; 123 124 125 static void mqtt_cyclic_timer(void *arg); 126 127 #if defined(LWIP_DEBUG) 128 static const char *const mqtt_message_type_str[15] = { 129 "UNDEFINED", 130 "CONNECT", 131 "CONNACK", 132 "PUBLISH", 133 "PUBACK", 134 "PUBREC", 135 "PUBREL", 136 "PUBCOMP", 137 "SUBSCRIBE", 138 "SUBACK", 139 "UNSUBSCRIBE", 140 "UNSUBACK", 141 "PINGREQ", 142 "PINGRESP", 143 "DISCONNECT" 144 }; 145 146 /** 147 * Message type value to string 148 * @param msg_type see enum mqtt_message_type 149 * 150 * @return Control message type text string 151 */ 152 static const char * 153 mqtt_msg_type_to_str(u8_t msg_type) 154 { 155 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 156 msg_type = 0; 157 } 158 return mqtt_message_type_str[msg_type]; 159 } 160 161 #endif 162 163 164 /** 165 * Generate MQTT packet identifier 166 * @param client MQTT client 167 * @return New packet identifier, range 1 to 65535 168 */ 169 static u16_t 170 msg_generate_packet_id(mqtt_client_t *client) 171 { 172 client->pkt_id_seq++; 173 if (client->pkt_id_seq == 0) { 174 client->pkt_id_seq++; 175 } 176 return client->pkt_id_seq; 177 } 178 179 /*--------------------------------------------------------------------------------------------------------------------- */ 180 /* Output ring buffer */ 181 182 /** Add single item to ring buffer */ 183 static void 184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item) 185 { 186 rb->buf[rb->put] = item; 187 rb->put++; 188 if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) { 189 rb->put = 0; 190 } 191 } 192 193 /** Return pointer to ring buffer get position */ 194 static u8_t * 195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb) 196 { 197 return &rb->buf[rb->get]; 198 } 199 200 static void 201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len) 202 { 203 LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE); 204 205 rb->get += len; 206 if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) { 207 rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE; 208 } 209 } 210 211 /** Return number of bytes in ring buffer */ 212 static u16_t 213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb) 214 { 215 u32_t len = rb->put - rb->get; 216 if (len > 0xFFFF) { 217 len += MQTT_OUTPUT_RINGBUF_SIZE; 218 } 219 return (u16_t)len; 220 } 221 222 /** Return number of bytes free in ring buffer */ 223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 224 225 /** Return number of bytes possible to read without wrapping around */ 226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get)) 227 228 /** 229 * Try send as many bytes as possible from output ring buffer 230 * @param rb Output ring buffer 231 * @param tpcb TCP connection handle 232 */ 233 static void 234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb) 235 { 236 err_t err; 237 u8_t wrap = 0; 238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 239 u16_t send_len = altcp_sndbuf(tpcb); 240 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 241 242 if (send_len == 0 || ringbuf_lin_len == 0) { 243 return; 244 } 245 246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 247 send_len, ringbuf_lin_len, rb->get, rb->put)); 248 249 if (send_len > ringbuf_lin_len) { 250 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 251 send_len = ringbuf_lin_len; 252 /* Wrap around if more data in ring buffer after linear portion */ 253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 254 } 255 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 256 if ((err == ERR_OK) && wrap) { 257 mqtt_ringbuf_advance_get_idx(rb, send_len); 258 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 259 send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); 260 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 261 } 262 263 if (err == ERR_OK) { 264 mqtt_ringbuf_advance_get_idx(rb, send_len); 265 /* Flush */ 266 altcp_output(tpcb); 267 } else { 268 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 269 } 270 } 271 272 273 274 /*--------------------------------------------------------------------------------------------------------------------- */ 275 /* Request queue */ 276 277 /** 278 * Create request item 279 * @param r_objs Pointer to request objects 280 * @param r_objs_len Number of array entries 281 * @param pkt_id Packet identifier of request 282 * @param cb Packet callback to call when requests lifetime ends 283 * @param arg Parameter following callback 284 * @return Request or NULL if failed to create 285 */ 286 static struct mqtt_request_t * 287 mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 288 { 289 struct mqtt_request_t *r = NULL; 290 u8_t n; 291 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 292 for (n = 0; n < r_objs_len; n++) { 293 /* Item point to itself if not in use */ 294 if (r_objs[n].next == &r_objs[n]) { 295 r = &r_objs[n]; 296 r->next = NULL; 297 r->cb = cb; 298 r->arg = arg; 299 r->pkt_id = pkt_id; 300 break; 301 } 302 } 303 return r; 304 } 305 306 307 /** 308 * Append request to pending request queue 309 * @param tail Pointer to request queue tail pointer 310 * @param r Request to append 311 */ 312 static void 313 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 314 { 315 struct mqtt_request_t *head = NULL; 316 s16_t time_before = 0; 317 struct mqtt_request_t *iter; 318 319 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 320 321 /* Iterate trough queue to find head, and count total timeout time */ 322 for (iter = *tail; iter != NULL; iter = iter->next) { 323 time_before += iter->timeout_diff; 324 head = iter; 325 } 326 327 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 328 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 329 if (head == NULL) { 330 *tail = r; 331 } else { 332 head->next = r; 333 } 334 } 335 336 337 /** 338 * Delete request item 339 * @param r Request item to delete 340 */ 341 static void 342 mqtt_delete_request(struct mqtt_request_t *r) 343 { 344 if (r != NULL) { 345 r->next = r; 346 } 347 } 348 349 /** 350 * Remove a request item with a specific packet identifier from request queue 351 * @param tail Pointer to request queue tail pointer 352 * @param pkt_id Packet identifier of request to take 353 * @return Request item if found, NULL if not 354 */ 355 static struct mqtt_request_t * 356 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 357 { 358 struct mqtt_request_t *iter = NULL, *prev = NULL; 359 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 360 /* Search all request for pkt_id */ 361 for (iter = *tail; iter != NULL; iter = iter->next) { 362 if (iter->pkt_id == pkt_id) { 363 break; 364 } 365 prev = iter; 366 } 367 368 /* If request was found */ 369 if (iter != NULL) { 370 /* unchain */ 371 if (prev == NULL) { 372 *tail = iter->next; 373 } else { 374 prev->next = iter->next; 375 } 376 /* If exists, add remaining timeout time for the request to next */ 377 if (iter->next != NULL) { 378 iter->next->timeout_diff += iter->timeout_diff; 379 } 380 iter->next = NULL; 381 } 382 return iter; 383 } 384 385 /** 386 * Handle requests timeout 387 * @param tail Pointer to request queue tail pointer 388 * @param t Time since last call in seconds 389 */ 390 static void 391 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 392 { 393 struct mqtt_request_t *r; 394 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 395 r = *tail; 396 while (t > 0 && r != NULL) { 397 if (t >= r->timeout_diff) { 398 t -= (u8_t)r->timeout_diff; 399 /* Unchain */ 400 *tail = r->next; 401 /* Notify upper layer about timeout */ 402 if (r->cb != NULL) { 403 r->cb(r->arg, ERR_TIMEOUT); 404 } 405 mqtt_delete_request(r); 406 /* Tail might be be modified in callback, so re-read it in every iteration */ 407 r = *(struct mqtt_request_t *const volatile *)tail; 408 } else { 409 r->timeout_diff -= t; 410 t = 0; 411 } 412 } 413 } 414 415 /** 416 * Free all request items 417 * @param tail Pointer to request queue tail pointer 418 */ 419 static void 420 mqtt_clear_requests(struct mqtt_request_t **tail) 421 { 422 struct mqtt_request_t *iter, *next; 423 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 424 for (iter = *tail; iter != NULL; iter = next) { 425 next = iter->next; 426 mqtt_delete_request(iter); 427 } 428 *tail = NULL; 429 } 430 /** 431 * Initialize all request items 432 * @param r_objs Pointer to request objects 433 * @param r_objs_len Number of array entries 434 */ 435 static void 436 mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len) 437 { 438 u8_t n; 439 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 440 for (n = 0; n < r_objs_len; n++) { 441 /* Item pointing to itself indicates unused */ 442 r_objs[n].next = &r_objs[n]; 443 } 444 } 445 446 /*--------------------------------------------------------------------------------------------------------------------- */ 447 /* Output message build helpers */ 448 449 450 static void 451 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 452 { 453 mqtt_ringbuf_put(rb, value); 454 } 455 456 static 457 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 458 { 459 mqtt_ringbuf_put(rb, value >> 8); 460 mqtt_ringbuf_put(rb, value & 0xff); 461 } 462 463 static void 464 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 465 { 466 u16_t n; 467 for (n = 0; n < length; n++) { 468 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 469 } 470 } 471 472 static void 473 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 474 { 475 u16_t n; 476 mqtt_ringbuf_put(rb, length >> 8); 477 mqtt_ringbuf_put(rb, length & 0xff); 478 for (n = 0; n < length; n++) { 479 mqtt_ringbuf_put(rb, str[n]); 480 } 481 } 482 483 /** 484 * Append fixed header 485 * @param rb Output ring buffer 486 * @param msg_type see enum mqtt_message_type 487 * @param fdup MQTT DUP flag 488 * @param fqos MQTT QoS field 489 * @param fretain MQTT retain flag 490 * @param r_length Remaining length after fixed header 491 */ 492 493 static void 494 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup, 495 u8_t fqos, u8_t fretain, u16_t r_length) 496 { 497 /* Start with control byte */ 498 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1))); 499 /* Encode remaining length field */ 500 do { 501 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 502 r_length >>= 7; 503 } while (r_length > 0); 504 } 505 506 507 /** 508 * Check output buffer space 509 * @param rb Output ring buffer 510 * @param r_length Remaining length after fixed header 511 * @return 1 if message will fit, 0 if not enough buffer space 512 */ 513 static u8_t 514 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 515 { 516 /* Start with length of type byte + remaining length */ 517 u16_t total_len = 1 + r_length; 518 519 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 520 521 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 522 do { 523 total_len++; 524 r_length >>= 7; 525 } while (r_length > 0); 526 527 return (total_len <= mqtt_ringbuf_free(rb)); 528 } 529 530 531 /** 532 * Close connection to server 533 * @param client MQTT client 534 * @param reason Reason for disconnection 535 */ 536 static void 537 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 538 { 539 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 540 541 /* Bring down TCP connection if not already done */ 542 if (client->conn != NULL) { 543 err_t res; 544 altcp_recv(client->conn, NULL); 545 altcp_err(client->conn, NULL); 546 altcp_sent(client->conn, NULL); 547 res = altcp_close(client->conn); 548 if (res != ERR_OK) { 549 altcp_abort(client->conn); 550 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res))); 551 } 552 client->conn = NULL; 553 } 554 555 /* Remove all pending requests */ 556 mqtt_clear_requests(&client->pend_req_queue); 557 /* Stop cyclic timer */ 558 sys_untimeout(mqtt_cyclic_timer, client); 559 560 /* Notify upper layer of disconnection if changed state */ 561 if (client->conn_state != TCP_DISCONNECTED) { 562 563 client->conn_state = TCP_DISCONNECTED; 564 if (client->connect_cb != NULL) { 565 client->connect_cb(client, client->connect_arg, reason); 566 } 567 } 568 } 569 570 571 /** 572 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 573 * @param arg MQTT client 574 */ 575 static void 576 mqtt_cyclic_timer(void *arg) 577 { 578 u8_t restart_timer = 1; 579 mqtt_client_t *client = (mqtt_client_t *)arg; 580 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 581 582 if (client->conn_state == MQTT_CONNECTING) { 583 client->cyclic_tick++; 584 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 585 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 586 /* Disconnect TCP */ 587 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 588 restart_timer = 0; 589 } 590 } else if (client->conn_state == MQTT_CONNECTED) { 591 /* Handle timeout for pending requests */ 592 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 593 594 /* keep_alive > 0 means keep alive functionality shall be used */ 595 if (client->keep_alive > 0) { 596 597 client->server_watchdog++; 598 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 599 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) { 600 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 601 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 602 restart_timer = 0; 603 } 604 605 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 606 client->cyclic_tick++; 607 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 608 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 609 if (mqtt_output_check_space(&client->output, 0) != 0) { 610 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 611 client->cyclic_tick = 0; 612 } 613 } 614 } 615 } else { 616 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 617 restart_timer = 0; 618 } 619 if (restart_timer) { 620 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg); 621 } 622 } 623 624 625 /** 626 * Send PUBACK, PUBREC or PUBREL response message 627 * @param client MQTT client 628 * @param msg PUBACK, PUBREC or PUBREL 629 * @param pkt_id Packet identifier 630 * @param qos QoS value 631 * @return ERR_OK if successful, ERR_MEM if out of memory 632 */ 633 static err_t 634 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 635 { 636 err_t err = ERR_OK; 637 if (mqtt_output_check_space(&client->output, 2)) { 638 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 639 mqtt_output_append_u16(&client->output, pkt_id); 640 mqtt_output_send(&client->output, client->conn); 641 } else { 642 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 643 mqtt_msg_type_to_str(msg), pkt_id)); 644 err = ERR_MEM; 645 } 646 return err; 647 } 648 649 /** 650 * Subscribe response from server 651 * @param r Matching request 652 * @param result Result code from server 653 */ 654 static void 655 mqtt_incoming_suback(struct mqtt_request_t *r, u8_t result) 656 { 657 if (r->cb != NULL) { 658 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 659 } 660 } 661 662 663 /** 664 * Complete MQTT message received or buffer full 665 * @param client MQTT client 666 * @param fixed_hdr_len length of fixed header 667 * @param length length received part 668 * @param remaining_length Remaining length of complete message 669 */ 670 static mqtt_connection_status_t 671 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length, 672 u8_t *var_hdr_payload) 673 { 674 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 675 676 /* Control packet type */ 677 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 678 u16_t pkt_id = 0; 679 680 LWIP_ASSERT("fixed_hdr_len <= client->msg_idx", fixed_hdr_len <= client->msg_idx); 681 LWIP_ERROR("buffer length mismatch", fixed_hdr_len + length <= MQTT_VAR_HEADER_BUFFER_LEN, 682 return MQTT_CONNECT_DISCONNECTED); 683 684 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 685 if (client->conn_state == MQTT_CONNECTING) { 686 if (length < 2) { 687 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n")); 688 goto out_disconnect; 689 } 690 /* Get result code from CONNACK */ 691 res = (mqtt_connection_status_t)var_hdr_payload[1]; 692 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res)); 693 if (res == MQTT_CONNECT_ACCEPTED) { 694 /* Reset cyclic_tick when changing to connected state */ 695 client->cyclic_tick = 0; 696 client->conn_state = MQTT_CONNECTED; 697 /* Notify upper layer */ 698 if (client->connect_cb != NULL) { 699 client->connect_cb(client, client->connect_arg, res); 700 } 701 } 702 } else { 703 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n")); 704 } 705 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 706 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n")); 707 708 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 709 u16_t payload_offset = 0; 710 u16_t payload_length = length; 711 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 712 713 if (client->msg_idx == (u32_t)(fixed_hdr_len + length)) { 714 /* First publish message frame. Should have topic and pkt id*/ 715 size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len; 716 u8_t *topic; 717 u16_t after_topic; 718 u8_t bkp; 719 u16_t topic_len; 720 u16_t qos_len = (qos ? 2U : 0U); 721 if (length < 2 + qos_len) { 722 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n")); 723 goto out_disconnect; 724 } 725 topic_len = var_hdr_payload[0]; 726 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 727 if ((topic_len > length - (2 + qos_len)) || 728 (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) { 729 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n")); 730 goto out_disconnect; 731 } 732 733 topic = var_hdr_payload + 2; 734 after_topic = 2 + topic_len; 735 /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */ 736 if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) { 737 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 738 goto out_disconnect; 739 } 740 741 /* id for QoS 1 and 2 */ 742 if (qos > 0) { 743 if (length < after_topic + 2U) { 744 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n")); 745 goto out_disconnect; 746 } 747 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 748 after_topic += 2; 749 } else { 750 client->inpub_pkt_id = 0; 751 } 752 /* Take backup of byte after topic */ 753 bkp = topic[topic_len]; 754 /* Zero terminate string */ 755 topic[topic_len] = 0; 756 /* Payload data remaining in receive buffer */ 757 payload_length = length - after_topic; 758 payload_offset = after_topic; 759 760 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incoming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n", 761 qos, topic, remaining_length + payload_length)); 762 if (client->pub_cb != NULL) { 763 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 764 } 765 /* Restore byte after topic */ 766 topic[topic_len] = bkp; 767 } 768 if (payload_length > 0 || remaining_length == 0) { 769 if (length < (size_t)(payload_offset + payload_length)) { 770 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n")); 771 goto out_disconnect; 772 } 773 if (client->data_cb != NULL) { 774 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 775 } 776 /* Reply if QoS > 0 */ 777 if (remaining_length == 0 && qos > 0) { 778 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 779 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 780 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incoming_publish: Sending publish response: %s with pkt_id: %d\n", 781 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 782 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 783 } 784 } 785 } else { 786 if (length < 2) { 787 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short message\n")); 788 goto out_disconnect; 789 } 790 /* Get packet identifier */ 791 pkt_id = (u16_t)var_hdr_payload[0] << 8; 792 pkt_id |= (u16_t)var_hdr_payload[1]; 793 if (pkt_id == 0) { 794 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 795 goto out_disconnect; 796 } 797 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 798 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id)); 799 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 800 801 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 802 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id)); 803 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 804 805 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 806 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 807 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 808 if (r != NULL) { 809 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 810 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 811 if (length < 3) { 812 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n")); 813 goto out_disconnect; 814 } else { 815 mqtt_incoming_suback(r, var_hdr_payload[2]); 816 } 817 } else if (r->cb != NULL) { 818 r->cb(r->arg, ERR_OK); 819 } 820 mqtt_delete_request(r); 821 } else { 822 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 823 } 824 } else { 825 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 826 goto out_disconnect; 827 } 828 } 829 return res; 830 out_disconnect: 831 return MQTT_CONNECT_DISCONNECTED; 832 } 833 834 835 /** 836 * MQTT incoming message parser 837 * @param client MQTT client 838 * @param p PBUF chain of received data 839 * @return Connection status 840 */ 841 static mqtt_connection_status_t 842 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 843 { 844 u16_t in_offset = 0; 845 u32_t msg_rem_len = 0; 846 u8_t fixed_hdr_len = 0; 847 u8_t b = 0; 848 849 while (p->tot_len > in_offset) { 850 /* We ALWAYS parse the header here first. Even if the header was not 851 included in this segment, we re-parse it here by buffering it in 852 client->rx_buffer. client->msg_idx keeps track of this. */ 853 if ((fixed_hdr_len < 2) || ((b & 0x80) != 0)) { 854 855 if (fixed_hdr_len < client->msg_idx) { 856 /* parse header from old pbuf (buffered in client->rx_buffer) */ 857 b = client->rx_buffer[fixed_hdr_len]; 858 } else { 859 /* parse header from this pbuf and save it in client->rx_buffer in case 860 it comes in segmented */ 861 b = pbuf_get_at(p, in_offset++); 862 client->rx_buffer[client->msg_idx++] = b; 863 } 864 fixed_hdr_len++; 865 866 if (fixed_hdr_len >= 2) { 867 /* fixed header contains at least 2 bytes but can contain more, depending on 868 'remaining length'. All bytes but the last of this have 0x80 set to 869 indicate more bytes are coming. */ 870 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_len - 2) * 7); 871 if ((b & 0x80) == 0) { 872 /* fixed header is done */ 873 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); 874 if (msg_rem_len == 0) { 875 /* Complete message with no extra headers of payload received */ 876 mqtt_message_received(client, fixed_hdr_len, 0, 0, NULL); 877 client->msg_idx = 0; 878 fixed_hdr_len = 0; 879 } else { 880 /* Bytes remaining in message (changes remaining length if this is 881 not the first segment of this message) */ 882 msg_rem_len = (msg_rem_len + fixed_hdr_len) - client->msg_idx; 883 } 884 } 885 } 886 } else { 887 /* Fixed header has been parsed, parse variable header */ 888 u16_t cpy_len, buffer_space; 889 u8_t *var_hdr_payload; 890 mqtt_connection_status_t res; 891 892 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 893 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 894 895 /* Limit to available space in buffer */ 896 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_len; 897 if (cpy_len > buffer_space) { 898 cpy_len = buffer_space; 899 } 900 /* Adjust cpy_len to ensure zero-copy operation for remaining parts of current message */ 901 if (client->msg_idx >= MQTT_VAR_HEADER_BUFFER_LEN) { 902 if (cpy_len > (p->len - in_offset)) 903 cpy_len = p->len - in_offset; 904 } 905 var_hdr_payload = (u8_t*)pbuf_get_contiguous(p, client->rx_buffer + fixed_hdr_len, 906 buffer_space, cpy_len, in_offset); 907 908 /* Advance get and put indexes */ 909 client->msg_idx += cpy_len; 910 in_offset += cpy_len; 911 msg_rem_len -= cpy_len; 912 913 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); 914 /* Whole or partial message received */ 915 res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len, var_hdr_payload); 916 if (res != MQTT_CONNECT_ACCEPTED) { 917 return res; 918 } 919 if (msg_rem_len == 0) { 920 /* Reset parser state */ 921 client->msg_idx = 0; 922 /* msg_tot_len = 0; */ 923 fixed_hdr_len = 0; 924 } 925 } 926 } 927 return MQTT_CONNECT_ACCEPTED; 928 } 929 930 931 /** 932 * TCP received callback function. @see tcp_recv_fn 933 * @param arg MQTT client 934 * @param p PBUF chain of received data 935 * @param err Passed as return value if not ERR_OK 936 * @return ERR_OK or err passed into callback 937 */ 938 static err_t 939 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err) 940 { 941 mqtt_client_t *client = (mqtt_client_t *)arg; 942 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 943 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 944 945 if (p == NULL) { 946 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 947 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 948 } else { 949 mqtt_connection_status_t res; 950 if (err != ERR_OK) { 951 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 952 pbuf_free(p); 953 return err; 954 } 955 956 /* Tell remote that data has been received */ 957 altcp_recved(pcb, p->tot_len); 958 res = mqtt_parse_incoming(client, p); 959 pbuf_free(p); 960 961 if (res != MQTT_CONNECT_ACCEPTED) { 962 mqtt_close(client, res); 963 } 964 /* If keep alive functionality is used */ 965 if (client->keep_alive != 0) { 966 /* Reset server alive watchdog */ 967 client->server_watchdog = 0; 968 } 969 970 } 971 return ERR_OK; 972 } 973 974 975 /** 976 * TCP data sent callback function. @see tcp_sent_fn 977 * @param arg MQTT client 978 * @param tpcb TCP connection handle 979 * @param len Number of bytes sent 980 * @return ERR_OK 981 */ 982 static err_t 983 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len) 984 { 985 mqtt_client_t *client = (mqtt_client_t *)arg; 986 987 LWIP_UNUSED_ARG(tpcb); 988 LWIP_UNUSED_ARG(len); 989 990 if (client->conn_state == MQTT_CONNECTED) { 991 struct mqtt_request_t *r; 992 993 /* Reset keep-alive send timer and server watchdog */ 994 client->cyclic_tick = 0; 995 client->server_watchdog = 0; 996 /* QoS 0 publish has no response from server, so call its callbacks here */ 997 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 998 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 999 if (r->cb != NULL) { 1000 r->cb(r->arg, ERR_OK); 1001 } 1002 mqtt_delete_request(r); 1003 } 1004 /* Try send any remaining buffers from output queue */ 1005 mqtt_output_send(&client->output, client->conn); 1006 } 1007 return ERR_OK; 1008 } 1009 1010 /** 1011 * TCP error callback function. @see tcp_err_fn 1012 * @param arg MQTT client 1013 * @param err Error encountered 1014 */ 1015 static void 1016 mqtt_tcp_err_cb(void *arg, err_t err) 1017 { 1018 mqtt_client_t *client = (mqtt_client_t *)arg; 1019 LWIP_UNUSED_ARG(err); /* only used for debug output */ 1020 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 1021 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 1022 /* Set conn to null before calling close as pcb is already deallocated*/ 1023 client->conn = NULL; 1024 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 1025 } 1026 1027 /** 1028 * TCP poll callback function. @see tcp_poll_fn 1029 * @param arg MQTT client 1030 * @param tpcb TCP connection handle 1031 * @return err ERR_OK 1032 */ 1033 static err_t 1034 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb) 1035 { 1036 mqtt_client_t *client = (mqtt_client_t *)arg; 1037 if (client->conn_state == MQTT_CONNECTED) { 1038 /* Try send any remaining buffers from output queue */ 1039 mqtt_output_send(&client->output, tpcb); 1040 } 1041 return ERR_OK; 1042 } 1043 1044 /** 1045 * TCP connect callback function. @see tcp_connected_fn 1046 * @param arg MQTT client 1047 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 1048 * @return ERR_OK 1049 */ 1050 static err_t 1051 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err) 1052 { 1053 mqtt_client_t *client = (mqtt_client_t *)arg; 1054 1055 if (err != ERR_OK) { 1056 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 1057 return err; 1058 } 1059 1060 /* Initiate receiver state */ 1061 client->msg_idx = 0; 1062 1063 /* Setup TCP callbacks */ 1064 altcp_recv(tpcb, mqtt_tcp_recv_cb); 1065 altcp_sent(tpcb, mqtt_tcp_sent_cb); 1066 altcp_poll(tpcb, mqtt_tcp_poll_cb, 2); 1067 1068 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n")); 1069 /* Enter MQTT connect state */ 1070 client->conn_state = MQTT_CONNECTING; 1071 1072 /* Start cyclic timer */ 1073 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client); 1074 client->cyclic_tick = 0; 1075 1076 /* Start transmission from output queue, connect message is the first one out*/ 1077 mqtt_output_send(&client->output, client->conn); 1078 1079 return ERR_OK; 1080 } 1081 1082 1083 1084 /*---------------------------------------------------------------------------------------------------- */ 1085 /* Public API */ 1086 1087 1088 /** 1089 * @ingroup mqtt 1090 * MQTT publish function. 1091 * @param client MQTT client 1092 * @param topic Publish topic string 1093 * @param payload Data to publish (NULL is allowed) 1094 * @param payload_length Length of payload (0 is allowed) 1095 * @param qos Quality of service, 0 1 or 2 1096 * @param retain MQTT retain flag 1097 * @param cb Callback to call when publish is complete or has timed out 1098 * @param arg User supplied argument to publish callback 1099 * @return ERR_OK if successful 1100 * ERR_CONN if client is disconnected 1101 * ERR_MEM if short on memory 1102 */ 1103 err_t 1104 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 1105 mqtt_request_cb_t cb, void *arg) 1106 { 1107 struct mqtt_request_t *r; 1108 u16_t pkt_id; 1109 size_t topic_strlen; 1110 size_t total_len; 1111 u16_t topic_len; 1112 u16_t remaining_length; 1113 1114 LWIP_ASSERT_CORE_LOCKED(); 1115 LWIP_ASSERT("mqtt_publish: client != NULL", client); 1116 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 1117 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 1118 1119 topic_strlen = strlen(topic); 1120 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1121 topic_len = (u16_t)topic_strlen; 1122 total_len = 2 + topic_len + payload_length; 1123 1124 if (qos > 0) { 1125 total_len += 2; 1126 /* Generate pkt_id id for QoS1 and 2 */ 1127 pkt_id = msg_generate_packet_id(client); 1128 } else { 1129 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 1130 pkt_id = 0; 1131 } 1132 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1133 remaining_length = (u16_t)total_len; 1134 1135 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 1136 1137 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 1138 if (r == NULL) { 1139 return ERR_MEM; 1140 } 1141 1142 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1143 mqtt_delete_request(r); 1144 return ERR_MEM; 1145 } 1146 /* Append fixed header */ 1147 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 1148 1149 /* Append Topic */ 1150 mqtt_output_append_string(&client->output, topic, topic_len); 1151 1152 /* Append packet if for QoS 1 and 2*/ 1153 if (qos > 0) { 1154 mqtt_output_append_u16(&client->output, pkt_id); 1155 } 1156 1157 /* Append optional publish payload */ 1158 if ((payload != NULL) && (payload_length > 0)) { 1159 mqtt_output_append_buf(&client->output, payload, payload_length); 1160 } 1161 1162 mqtt_append_request(&client->pend_req_queue, r); 1163 mqtt_output_send(&client->output, client->conn); 1164 return ERR_OK; 1165 } 1166 1167 1168 /** 1169 * @ingroup mqtt 1170 * MQTT subscribe/unsubscribe function. 1171 * @param client MQTT client 1172 * @param topic topic to subscribe to 1173 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 1174 * @param cb Callback to call when subscribe/unsubscribe response is received 1175 * @param arg User supplied argument to publish callback 1176 * @param sub 1 for subscribe, 0 for unsubscribe 1177 * @return ERR_OK if successful, @see err_t enum for other results 1178 */ 1179 err_t 1180 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 1181 { 1182 size_t topic_strlen; 1183 size_t total_len; 1184 u16_t topic_len; 1185 u16_t remaining_length; 1186 u16_t pkt_id; 1187 struct mqtt_request_t *r; 1188 1189 LWIP_ASSERT_CORE_LOCKED(); 1190 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 1191 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 1192 1193 topic_strlen = strlen(topic); 1194 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1195 topic_len = (u16_t)topic_strlen; 1196 /* Topic string, pkt_id, qos for subscribe */ 1197 total_len = topic_len + 2 + 2 + (sub != 0); 1198 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1199 remaining_length = (u16_t)total_len; 1200 1201 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 1202 if (client->conn_state == TCP_DISCONNECTED) { 1203 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 1204 return ERR_CONN; 1205 } 1206 1207 pkt_id = msg_generate_packet_id(client); 1208 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 1209 if (r == NULL) { 1210 return ERR_MEM; 1211 } 1212 1213 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1214 mqtt_delete_request(r); 1215 return ERR_MEM; 1216 } 1217 1218 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 1219 1220 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 1221 /* Packet id */ 1222 mqtt_output_append_u16(&client->output, pkt_id); 1223 /* Topic */ 1224 mqtt_output_append_string(&client->output, topic, topic_len); 1225 /* QoS */ 1226 if (sub != 0) { 1227 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 1228 } 1229 1230 mqtt_append_request(&client->pend_req_queue, r); 1231 mqtt_output_send(&client->output, client->conn); 1232 return ERR_OK; 1233 } 1234 1235 1236 /** 1237 * @ingroup mqtt 1238 * Set callback to handle incoming publish requests from server 1239 * @param client MQTT client 1240 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 1241 * @param data_cb Callback for each fragment of payload that arrives 1242 * @param arg User supplied argument to both callbacks 1243 */ 1244 void 1245 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 1246 mqtt_incoming_data_cb_t data_cb, void *arg) 1247 { 1248 LWIP_ASSERT_CORE_LOCKED(); 1249 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 1250 client->data_cb = data_cb; 1251 client->pub_cb = pub_cb; 1252 client->inpub_arg = arg; 1253 } 1254 1255 /** 1256 * @ingroup mqtt 1257 * Create a new MQTT client instance 1258 * @return Pointer to instance on success, NULL otherwise 1259 */ 1260 mqtt_client_t * 1261 mqtt_client_new(void) 1262 { 1263 LWIP_ASSERT_CORE_LOCKED(); 1264 return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t)); 1265 } 1266 1267 /** 1268 * @ingroup mqtt 1269 * Free MQTT client instance 1270 * @param client Pointer to instance to be freed 1271 */ 1272 void 1273 mqtt_client_free(mqtt_client_t *client) 1274 { 1275 mem_free(client); 1276 } 1277 1278 /** 1279 * @ingroup mqtt 1280 * Connect to MQTT server 1281 * @param client MQTT client 1282 * @param ip_addr Server IP 1283 * @param port Server port 1284 * @param cb Connection state change callback 1285 * @param arg User supplied argument to connection callback 1286 * @param client_info Client identification and connection options 1287 * @return ERR_OK if successful, @see err_t enum for other results 1288 */ 1289 err_t 1290 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 1291 const struct mqtt_connect_client_info_t *client_info) 1292 { 1293 err_t err; 1294 size_t len; 1295 u16_t client_id_length; 1296 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 1297 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 1298 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 1299 u16_t client_user_len = 0, client_pass_len = 0; 1300 mqtt_incoming_data_cb_t data_cb; 1301 mqtt_incoming_publish_cb_t pub_cb; 1302 void *inpub_arg; 1303 1304 LWIP_ASSERT_CORE_LOCKED(); 1305 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 1306 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 1307 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 1308 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 1309 1310 if (client->conn_state != TCP_DISCONNECTED) { 1311 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n")); 1312 return ERR_ISCONN; 1313 } 1314 1315 /* Wipe clean, but keep callbacks */ 1316 data_cb = client->data_cb; 1317 pub_cb = client->pub_cb; 1318 inpub_arg = client->inpub_arg; 1319 memset(client, 0, sizeof(mqtt_client_t)); 1320 client->data_cb = data_cb; 1321 client->pub_cb = pub_cb; 1322 client->inpub_arg = inpub_arg; 1323 1324 client->connect_arg = arg; 1325 client->connect_cb = cb; 1326 client->keep_alive = client_info->keep_alive; 1327 mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list)); 1328 1329 /* Build connect message */ 1330 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 1331 flags |= MQTT_CONNECT_FLAG_WILL; 1332 flags |= (client_info->will_qos & 3) << 3; 1333 if (client_info->will_retain) { 1334 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 1335 } 1336 len = strlen(client_info->will_topic); 1337 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 1338 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); 1339 will_topic_len = (u8_t)len; 1340 len = strlen(client_info->will_msg); 1341 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 1342 will_msg_len = (u8_t)len; 1343 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 1344 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1345 remaining_length = (u16_t)len; 1346 } 1347 if (client_info->client_user != NULL) { 1348 flags |= MQTT_CONNECT_FLAG_USERNAME; 1349 len = strlen(client_info->client_user); 1350 LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL); 1351 LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL); 1352 client_user_len = (u16_t)len; 1353 len = remaining_length + 2 + client_user_len; 1354 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1355 remaining_length = (u16_t)len; 1356 } 1357 if (client_info->client_pass != NULL) { 1358 flags |= MQTT_CONNECT_FLAG_PASSWORD; 1359 len = strlen(client_info->client_pass); 1360 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL); 1361 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL); 1362 client_pass_len = (u16_t)len; 1363 len = remaining_length + 2 + client_pass_len; 1364 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1365 remaining_length = (u16_t)len; 1366 } 1367 1368 /* Don't complicate things, always connect using clean session */ 1369 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 1370 1371 len = strlen(client_info->client_id); 1372 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 1373 client_id_length = (u16_t)len; 1374 len = remaining_length + 2 + client_id_length; 1375 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1376 remaining_length = (u16_t)len; 1377 1378 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1379 return ERR_MEM; 1380 } 1381 1382 #if LWIP_ALTCP && LWIP_ALTCP_TLS 1383 if (client_info->tls_config) { 1384 client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr)); 1385 } else 1386 #endif 1387 { 1388 client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr)); 1389 } 1390 if (client->conn == NULL) { 1391 return ERR_MEM; 1392 } 1393 1394 /* Set arg pointer for callbacks */ 1395 altcp_arg(client->conn, client); 1396 /* Any local address, pick random local port number */ 1397 err = altcp_bind(client->conn, IP_ADDR_ANY, 0); 1398 if (err != ERR_OK) { 1399 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 1400 goto tcp_fail; 1401 } 1402 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 1403 1404 /* Connect to server */ 1405 err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); 1406 if (err != ERR_OK) { 1407 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 1408 goto tcp_fail; 1409 } 1410 /* Set error callback */ 1411 altcp_err(client->conn, mqtt_tcp_err_cb); 1412 client->conn_state = TCP_CONNECTING; 1413 1414 /* Append fixed header */ 1415 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 1416 /* Append Protocol string */ 1417 mqtt_output_append_string(&client->output, "MQTT", 4); 1418 /* Append Protocol level */ 1419 mqtt_output_append_u8(&client->output, 4); 1420 /* Append connect flags */ 1421 mqtt_output_append_u8(&client->output, flags); 1422 /* Append keep-alive */ 1423 mqtt_output_append_u16(&client->output, client_info->keep_alive); 1424 /* Append client id */ 1425 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 1426 /* Append will message if used */ 1427 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { 1428 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 1429 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 1430 } 1431 /* Append user name if given */ 1432 if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) { 1433 mqtt_output_append_string(&client->output, client_info->client_user, client_user_len); 1434 } 1435 /* Append password if given */ 1436 if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) { 1437 mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len); 1438 } 1439 return ERR_OK; 1440 1441 tcp_fail: 1442 altcp_abort(client->conn); 1443 client->conn = NULL; 1444 return err; 1445 } 1446 1447 1448 /** 1449 * @ingroup mqtt 1450 * Disconnect from MQTT server 1451 * @param client MQTT client 1452 */ 1453 void 1454 mqtt_disconnect(mqtt_client_t *client) 1455 { 1456 LWIP_ASSERT_CORE_LOCKED(); 1457 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 1458 /* If connection in not already closed */ 1459 if (client->conn_state != TCP_DISCONNECTED) { 1460 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 1461 client->conn_state = TCP_DISCONNECTED; 1462 mqtt_close(client, (mqtt_connection_status_t)0); 1463 } 1464 } 1465 1466 /** 1467 * @ingroup mqtt 1468 * Check connection with server 1469 * @param client MQTT client 1470 * @return 1 if connected to server, 0 otherwise 1471 */ 1472 u8_t 1473 mqtt_client_is_connected(mqtt_client_t *client) 1474 { 1475 LWIP_ASSERT_CORE_LOCKED(); 1476 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 1477 return client->conn_state == MQTT_CONNECTED; 1478 } 1479 1480 #endif /* LWIP_TCP && LWIP_CALLBACK_API */ 1481