1 /** 2 * @file 3 * MQTT client 4 * 5 * @defgroup mqtt MQTT client 6 * @ingroup apps 7 * @verbinclude mqtt_client.txt 8 */ 9 10 /* 11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 12 * All rights reserved. 13 * 14 * Redistribution and use in source and binary forms, with or without modification, 15 * are permitted provided that the following conditions are met: 16 * 17 * 1. Redistributions of source code must retain the above copyright notice, 18 * this list of conditions and the following disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright notice, 20 * this list of conditions and the following disclaimer in the documentation 21 * and/or other materials provided with the distribution. 22 * 3. The name of the author may not be used to endorse or promote products 23 * derived from this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 34 * OF SUCH DAMAGE. 35 * 36 * This file is part of the lwIP TCP/IP stack 37 * 38 * Author: Erik Andersson <erian747@gmail.com> 39 * 40 * 41 * @todo: 42 * - Handle large outgoing payloads for PUBLISH messages 43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 44 * - Add support for legacy MQTT protocol version 45 * 46 * Please coordinate changes and requests with Erik Andersson 47 * Erik Andersson <erian747@gmail.com> 48 * 49 */ 50 #include "lwip/apps/mqtt.h" 51 #include "lwip/timeouts.h" 52 #include "lwip/ip_addr.h" 53 #include "lwip/mem.h" 54 #include "lwip/err.h" 55 #include "lwip/pbuf.h" 56 #include "lwip/tcp.h" 57 #include <string.h> 58 59 #if LWIP_TCP && LWIP_CALLBACK_API 60 61 /** 62 * MQTT_DEBUG: Default is off. 63 */ 64 #if !defined MQTT_DEBUG || defined __DOXYGEN__ 65 #define MQTT_DEBUG LWIP_DBG_OFF 66 #endif 67 68 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 69 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 70 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 71 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 72 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 73 74 static void mqtt_cyclic_timer(void *arg); 75 76 /** 77 * MQTT client connection states 78 */ 79 enum { 80 TCP_DISCONNECTED, 81 TCP_CONNECTING, 82 MQTT_CONNECTING, 83 MQTT_CONNECTED 84 }; 85 86 /** 87 * MQTT control message types 88 */ 89 enum mqtt_message_type { 90 MQTT_MSG_TYPE_CONNECT = 1, 91 MQTT_MSG_TYPE_CONNACK = 2, 92 MQTT_MSG_TYPE_PUBLISH = 3, 93 MQTT_MSG_TYPE_PUBACK = 4, 94 MQTT_MSG_TYPE_PUBREC = 5, 95 MQTT_MSG_TYPE_PUBREL = 6, 96 MQTT_MSG_TYPE_PUBCOMP = 7, 97 MQTT_MSG_TYPE_SUBSCRIBE = 8, 98 MQTT_MSG_TYPE_SUBACK = 9, 99 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 100 MQTT_MSG_TYPE_UNSUBACK = 11, 101 MQTT_MSG_TYPE_PINGREQ = 12, 102 MQTT_MSG_TYPE_PINGRESP = 13, 103 MQTT_MSG_TYPE_DISCONNECT = 14 104 }; 105 106 /** Helpers to extract control packet type and qos from first byte in fixed header */ 107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 109 110 /** 111 * MQTT connect flags, only used in CONNECT message 112 */ 113 enum mqtt_connect_flag { 114 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 115 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 116 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 117 MQTT_CONNECT_FLAG_WILL = 1 << 2, 118 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 119 }; 120 121 122 #if defined(LWIP_DEBUG) 123 static const char * const mqtt_message_type_str[15] = 124 { 125 "UNDEFINED", 126 "CONNECT", 127 "CONNACK", 128 "PUBLISH", 129 "PUBACK", 130 "PUBREC", 131 "PUBREL", 132 "PUBCOMP", 133 "SUBSCRIBE", 134 "SUBACK", 135 "UNSUBSCRIBE", 136 "UNSUBACK", 137 "PINGREQ", 138 "PINGRESP", 139 "DISCONNECT" 140 }; 141 142 /** 143 * Message type value to string 144 * @param msg_type see enum mqtt_message_type 145 * 146 * @return Control message type text string 147 */ 148 static const char * 149 mqtt_msg_type_to_str(u8_t msg_type) 150 { 151 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 152 msg_type = 0; 153 } 154 return mqtt_message_type_str[msg_type]; 155 } 156 157 #endif 158 159 160 /** 161 * Generate MQTT packet identifier 162 * @param client MQTT client 163 * @return New packet identifier, range 1 to 65535 164 */ 165 static u16_t 166 msg_generate_packet_id(mqtt_client_t *client) 167 { 168 client->pkt_id_seq++; 169 if (client->pkt_id_seq == 0) { 170 client->pkt_id_seq++; 171 } 172 return client->pkt_id_seq; 173 } 174 175 /*--------------------------------------------------------------------------------------------------------------------- */ 176 /* Output ring buffer */ 177 178 179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1) 180 181 /** Add single item to ring buffer */ 182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item) 183 184 /** Return number of bytes in ring buffer */ 185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get)) 186 187 /** Return number of bytes free in ring buffer */ 188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 189 190 /** Return number of bytes possible to read without wrapping around */ 191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK))) 192 193 /** Return pointer to ring buffer get position */ 194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK]) 195 196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len)) 197 198 199 /** 200 * Try send as many bytes as possible from output ring buffer 201 * @param rb Output ring buffer 202 * @param tpcb TCP connection handle 203 */ 204 static void 205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb) 206 { 207 err_t err; 208 u8_t wrap = 0; 209 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 210 u16_t send_len = tcp_sndbuf(tpcb); 211 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 212 213 if (send_len == 0 || ringbuf_lin_len == 0) { 214 return; 215 } 216 217 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 218 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK))); 219 220 if (send_len > ringbuf_lin_len) { 221 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 222 send_len = ringbuf_lin_len; 223 /* Wrap around if more data in ring buffer after linear portion */ 224 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 225 } 226 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 227 if ((err == ERR_OK) && wrap) { 228 mqtt_ringbuf_advance_get_idx(rb, send_len); 229 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 230 send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); 231 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 232 } 233 234 if (err == ERR_OK) { 235 mqtt_ringbuf_advance_get_idx(rb, send_len); 236 /* Flush */ 237 tcp_output(tpcb); 238 } else { 239 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 240 } 241 } 242 243 244 245 /*--------------------------------------------------------------------------------------------------------------------- */ 246 /* Request queue */ 247 248 /** 249 * Create request item 250 * @param r_objs Pointer to request objects 251 * @param pkt_id Packet identifier of request 252 * @param cb Packet callback to call when requests lifetime ends 253 * @param arg Parameter following callback 254 * @return Request or NULL if failed to create 255 */ 256 static struct mqtt_request_t * 257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 258 { 259 struct mqtt_request_t *r = NULL; 260 u8_t n; 261 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 262 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { 263 /* Item point to itself if not in use */ 264 if (r_objs[n].next == &r_objs[n]) { 265 r = &r_objs[n]; 266 r->next = NULL; 267 r->cb = cb; 268 r->arg = arg; 269 r->pkt_id = pkt_id; 270 break; 271 } 272 } 273 return r; 274 } 275 276 277 /** 278 * Append request to pending request queue 279 * @param tail Pointer to request queue tail pointer 280 * @param r Request to append 281 */ 282 static void 283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 284 { 285 struct mqtt_request_t *head = NULL; 286 s16_t time_before = 0; 287 struct mqtt_request_t *iter; 288 289 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 290 291 /* Iterate trough queue to find head, and count total timeout time */ 292 for (iter = *tail; iter != NULL; iter = iter->next) { 293 time_before += iter->timeout_diff; 294 head = iter; 295 } 296 297 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 298 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 299 if (head == NULL) { 300 *tail = r; 301 } else { 302 head->next = r; 303 } 304 } 305 306 307 /** 308 * Delete request item 309 * @param r Request item to delete 310 */ 311 static void 312 mqtt_delete_request(struct mqtt_request_t *r) 313 { 314 if (r != NULL) { 315 r->next = r; 316 } 317 } 318 319 /** 320 * Remove a request item with a specific packet identifier from request queue 321 * @param tail Pointer to request queue tail pointer 322 * @param pkt_id Packet identifier of request to take 323 * @return Request item if found, NULL if not 324 */ 325 static struct mqtt_request_t * 326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 327 { 328 struct mqtt_request_t *iter = NULL, *prev = NULL; 329 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 330 /* Search all request for pkt_id */ 331 for (iter = *tail; iter != NULL; iter = iter->next) { 332 if (iter->pkt_id == pkt_id) { 333 break; 334 } 335 prev = iter; 336 } 337 338 /* If request was found */ 339 if (iter != NULL) { 340 /* unchain */ 341 if (prev == NULL) { 342 *tail= iter->next; 343 } else { 344 prev->next = iter->next; 345 } 346 /* If exists, add remaining timeout time for the request to next */ 347 if (iter->next != NULL) { 348 iter->next->timeout_diff += iter->timeout_diff; 349 } 350 iter->next = NULL; 351 } 352 return iter; 353 } 354 355 /** 356 * Handle requests timeout 357 * @param tail Pointer to request queue tail pointer 358 * @param t Time since last call in seconds 359 */ 360 static void 361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 362 { 363 struct mqtt_request_t *r = *tail; 364 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 365 while (t > 0 && r != NULL) { 366 if (t >= r->timeout_diff) { 367 t -= (u8_t)r->timeout_diff; 368 /* Unchain */ 369 *tail = r->next; 370 /* Notify upper layer about timeout */ 371 if (r->cb != NULL) { 372 r->cb(r->arg, ERR_TIMEOUT); 373 } 374 mqtt_delete_request(r); 375 /* Tail might be be modified in callback, so re-read it in every iteration */ 376 r = *(struct mqtt_request_t * const volatile *)tail; 377 } else { 378 r->timeout_diff -= t; 379 t = 0; 380 } 381 } 382 } 383 384 /** 385 * Free all request items 386 * @param tail Pointer to request queue tail pointer 387 */ 388 static void 389 mqtt_clear_requests(struct mqtt_request_t **tail) 390 { 391 struct mqtt_request_t *iter, *next; 392 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 393 for (iter = *tail; iter != NULL; iter = next) { 394 next = iter->next; 395 mqtt_delete_request(iter); 396 } 397 *tail = NULL; 398 } 399 /** 400 * Initialize all request items 401 * @param r_objs Pointer to request objects 402 */ 403 static void 404 mqtt_init_requests(struct mqtt_request_t *r_objs) 405 { 406 u8_t n; 407 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 408 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { 409 /* Item pointing to itself indicates unused */ 410 r_objs[n].next = &r_objs[n]; 411 } 412 } 413 414 /*--------------------------------------------------------------------------------------------------------------------- */ 415 /* Output message build helpers */ 416 417 418 static void 419 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 420 { 421 mqtt_ringbuf_put(rb, value); 422 } 423 424 static 425 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 426 { 427 mqtt_ringbuf_put(rb, value >> 8); 428 mqtt_ringbuf_put(rb, value & 0xff); 429 } 430 431 static void 432 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 433 { 434 u16_t n; 435 for (n = 0; n < length; n++) { 436 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 437 } 438 } 439 440 static void 441 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 442 { 443 u16_t n; 444 mqtt_ringbuf_put(rb, length >> 8); 445 mqtt_ringbuf_put(rb, length & 0xff); 446 for (n = 0; n < length; n++) { 447 mqtt_ringbuf_put(rb, str[n]); 448 } 449 } 450 451 /** 452 * Append fixed header 453 * @param rb Output ring buffer 454 * @param msg_type see enum mqtt_message_type 455 * @param dup MQTT DUP flag 456 * @param qos MQTT QoS field 457 * @param retain MQTT retain flag 458 * @param r_length Remaining length after fixed header 459 */ 460 461 static void 462 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup, 463 u8_t qos, u8_t retain, u16_t r_length) 464 { 465 /* Start with control byte */ 466 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1))); 467 /* Encode remaining length field */ 468 do { 469 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 470 r_length >>= 7; 471 } while (r_length > 0); 472 } 473 474 475 /** 476 * Check output buffer space 477 * @param rb Output ring buffer 478 * @param r_length Remaining length after fixed header 479 * @return 1 if message will fit, 0 if not enough buffer space 480 */ 481 static u8_t 482 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 483 { 484 /* Start with length of type byte + remaining length */ 485 u16_t total_len = 1 + r_length; 486 487 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 488 489 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 490 do { 491 total_len++; 492 r_length >>= 7; 493 } while (r_length > 0); 494 495 return (total_len <= mqtt_ringbuf_free(rb)); 496 } 497 498 499 /** 500 * Close connection to server 501 * @param client MQTT client 502 * @param reason Reason for disconnection 503 */ 504 static void 505 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 506 { 507 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 508 509 /* Bring down TCP connection if not already done */ 510 if (client->conn != NULL) { 511 err_t res; 512 tcp_recv(client->conn, NULL); 513 tcp_err(client->conn, NULL); 514 tcp_sent(client->conn, NULL); 515 res = tcp_close(client->conn); 516 if (res != ERR_OK) { 517 tcp_abort(client->conn); 518 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res))); 519 } 520 client->conn = NULL; 521 } 522 523 /* Remove all pending requests */ 524 mqtt_clear_requests(&client->pend_req_queue); 525 /* Stop cyclic timer */ 526 sys_untimeout(mqtt_cyclic_timer, client); 527 528 /* Notify upper layer of disconnection if changed state */ 529 if (client->conn_state != TCP_DISCONNECTED) { 530 531 client->conn_state = TCP_DISCONNECTED; 532 if (client->connect_cb != NULL) { 533 client->connect_cb(client, client->connect_arg, reason); 534 } 535 } 536 } 537 538 539 /** 540 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 541 * @param arg MQTT client 542 */ 543 static void 544 mqtt_cyclic_timer(void *arg) 545 { 546 u8_t restart_timer = 1; 547 mqtt_client_t *client = (mqtt_client_t *)arg; 548 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 549 550 if (client->conn_state == MQTT_CONNECTING) { 551 client->cyclic_tick++; 552 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 553 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 554 /* Disconnect TCP */ 555 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 556 restart_timer = 0; 557 } 558 } else if (client->conn_state == MQTT_CONNECTED) { 559 /* Handle timeout for pending requests */ 560 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 561 562 /* keep_alive > 0 means keep alive functionality shall be used */ 563 if (client->keep_alive > 0) { 564 565 client->server_watchdog++; 566 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 567 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) { 568 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 569 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 570 restart_timer = 0; 571 } 572 573 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 574 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 575 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 576 if (mqtt_output_check_space(&client->output, 0) != 0) { 577 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 578 client->cyclic_tick = 0; 579 } 580 } else { 581 client->cyclic_tick++; 582 } 583 } 584 } else { 585 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 586 restart_timer = 0; 587 } 588 if (restart_timer) { 589 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg); 590 } 591 } 592 593 594 /** 595 * Send PUBACK, PUBREC or PUBREL response message 596 * @param client MQTT client 597 * @param msg PUBACK, PUBREC or PUBREL 598 * @param pkt_id Packet identifier 599 * @param qos QoS value 600 * @return ERR_OK if successful, ERR_MEM if out of memory 601 */ 602 static err_t 603 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 604 { 605 err_t err = ERR_OK; 606 if (mqtt_output_check_space(&client->output, 2)) { 607 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 608 mqtt_output_append_u16(&client->output, pkt_id); 609 mqtt_output_send(&client->output, client->conn); 610 } else { 611 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 612 mqtt_msg_type_to_str(msg), pkt_id)); 613 err = ERR_MEM; 614 } 615 return err; 616 } 617 618 /** 619 * Subscribe response from server 620 * @param r Matching request 621 * @param result Result code from server 622 */ 623 static void 624 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) 625 { 626 if (r->cb != NULL) { 627 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 628 } 629 } 630 631 632 /** 633 * Complete MQTT message received or buffer full 634 * @param client MQTT client 635 * @param fixed_hdr_idx header index 636 * @param length length received part 637 * @param remaining_length Remaining length of complete message 638 */ 639 static mqtt_connection_status_t 640 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) 641 { 642 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 643 644 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; 645 646 /* Control packet type */ 647 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 648 u16_t pkt_id = 0; 649 650 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 651 if (client->conn_state == MQTT_CONNECTING) { 652 /* Get result code from CONNACK */ 653 res = (mqtt_connection_status_t)var_hdr_payload[1]; 654 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res)); 655 if (res == MQTT_CONNECT_ACCEPTED) { 656 /* Reset cyclic_tick when changing to connected state */ 657 client->cyclic_tick = 0; 658 client->conn_state = MQTT_CONNECTED; 659 /* Notify upper layer */ 660 if (client->connect_cb != 0) { 661 client->connect_cb(client, client->connect_arg, res); 662 } 663 } 664 } else { 665 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n")); 666 } 667 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 668 LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n")); 669 670 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 671 u16_t payload_offset = 0; 672 u16_t payload_length = length; 673 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 674 675 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { 676 /* Should have topic and pkt id*/ 677 uint8_t *topic; 678 uint16_t after_topic; 679 u8_t bkp; 680 u16_t topic_len = var_hdr_payload[0]; 681 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 682 683 topic = var_hdr_payload + 2; 684 after_topic = 2 + topic_len; 685 /* Check length, add one byte even for QoS 0 so that zero termination will fit */ 686 if ((after_topic + (qos? 2 : 1)) > length) { 687 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 688 goto out_disconnect; 689 } 690 691 /* id for QoS 1 and 2 */ 692 if (qos > 0) { 693 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 694 after_topic += 2; 695 } else { 696 client->inpub_pkt_id = 0; 697 } 698 /* Take backup of byte after topic */ 699 bkp = topic[topic_len]; 700 /* Zero terminate string */ 701 topic[topic_len] = 0; 702 /* Payload data remaining in receive buffer */ 703 payload_length = length - after_topic; 704 payload_offset = after_topic; 705 706 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n", 707 qos, topic, remaining_length + payload_length)); 708 if (client->pub_cb != NULL) { 709 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 710 } 711 /* Restore byte after topic */ 712 topic[topic_len] = bkp; 713 } 714 if (payload_length > 0 || remaining_length == 0) { 715 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 716 /* Reply if QoS > 0 */ 717 if (remaining_length == 0 && qos > 0) { 718 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 719 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 720 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", 721 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 722 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 723 } 724 } 725 } else { 726 /* Get packet identifier */ 727 pkt_id = (u16_t)var_hdr_payload[0] << 8; 728 pkt_id |= (u16_t)var_hdr_payload[1]; 729 if (pkt_id == 0) { 730 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 731 goto out_disconnect; 732 } 733 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 734 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id)); 735 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 736 737 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 738 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id)); 739 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 740 741 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 742 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 743 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 744 if (r != NULL) { 745 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 746 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 747 if (length < 3) { 748 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n")); 749 goto out_disconnect; 750 } else { 751 mqtt_incomming_suback(r, var_hdr_payload[2]); 752 } 753 } else if (r->cb != NULL) { 754 r->cb(r->arg, ERR_OK); 755 } 756 mqtt_delete_request(r); 757 } else { 758 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 759 } 760 } else { 761 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 762 goto out_disconnect; 763 } 764 } 765 return res; 766 out_disconnect: 767 return MQTT_CONNECT_DISCONNECTED; 768 } 769 770 771 /** 772 * MQTT incoming message parser 773 * @param client MQTT client 774 * @param p PBUF chain of received data 775 * @return Connection status 776 */ 777 static mqtt_connection_status_t 778 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 779 { 780 u16_t in_offset = 0; 781 u32_t msg_rem_len = 0; 782 u8_t fixed_hdr_idx = 0; 783 u8_t b = 0; 784 785 while (p->tot_len > in_offset) { 786 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { 787 788 if (fixed_hdr_idx < client->msg_idx) { 789 b = client->rx_buffer[fixed_hdr_idx]; 790 } else { 791 b = pbuf_get_at(p, in_offset++); 792 client->rx_buffer[client->msg_idx++] = b; 793 } 794 fixed_hdr_idx++; 795 796 if (fixed_hdr_idx >= 2) { 797 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); 798 if ((b & 0x80) == 0) { 799 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len)); 800 if (msg_rem_len == 0) { 801 /* Complete message with no extra headers of payload received */ 802 mqtt_message_received(client, fixed_hdr_idx, 0, 0); 803 client->msg_idx = 0; 804 fixed_hdr_idx = 0; 805 } else { 806 /* Bytes remaining in message */ 807 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; 808 } 809 } 810 } 811 } else { 812 u16_t cpy_len, cpy_start, buffer_space; 813 814 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; 815 816 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 817 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 818 819 /* Limit to available space in buffer */ 820 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; 821 if (cpy_len > buffer_space) { 822 cpy_len = buffer_space; 823 } 824 pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset); 825 826 /* Advance get and put indexes */ 827 client->msg_idx += cpy_len; 828 in_offset += cpy_len; 829 msg_rem_len -= cpy_len; 830 831 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len)); 832 if (msg_rem_len == 0 || cpy_len == buffer_space) { 833 /* Whole message received or buffer is full */ 834 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); 835 if (res != MQTT_CONNECT_ACCEPTED) { 836 return res; 837 } 838 if (msg_rem_len == 0) { 839 /* Reset parser state */ 840 client->msg_idx = 0; 841 /* msg_tot_len = 0; */ 842 fixed_hdr_idx = 0; 843 } 844 } 845 } 846 } 847 return MQTT_CONNECT_ACCEPTED; 848 } 849 850 851 /** 852 * TCP received callback function. @see tcp_recv_fn 853 * @param arg MQTT client 854 * @param p PBUF chain of received data 855 * @param err Passed as return value if not ERR_OK 856 * @return ERR_OK or err passed into callback 857 */ 858 static err_t 859 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) 860 { 861 mqtt_client_t *client = (mqtt_client_t *)arg; 862 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 863 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 864 865 if (p == NULL) { 866 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 867 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 868 } else { 869 mqtt_connection_status_t res; 870 if (err != ERR_OK) { 871 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 872 pbuf_free(p); 873 return err; 874 } 875 876 /* Tell remote that data has been received */ 877 tcp_recved(pcb, p->tot_len); 878 res = mqtt_parse_incoming(client, p); 879 pbuf_free(p); 880 881 if (res != MQTT_CONNECT_ACCEPTED) { 882 mqtt_close(client, res); 883 } 884 /* If keep alive functionality is used */ 885 if (client->keep_alive != 0) { 886 /* Reset server alive watchdog */ 887 client->server_watchdog = 0; 888 } 889 890 } 891 return ERR_OK; 892 } 893 894 895 /** 896 * TCP data sent callback function. @see tcp_sent_fn 897 * @param arg MQTT client 898 * @param tpcb TCP connection handle 899 * @param len Number of bytes sent 900 * @return ERR_OK 901 */ 902 static err_t 903 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len) 904 { 905 mqtt_client_t *client = (mqtt_client_t *)arg; 906 907 LWIP_UNUSED_ARG(tpcb); 908 LWIP_UNUSED_ARG(len); 909 910 if (client->conn_state == MQTT_CONNECTED) { 911 struct mqtt_request_t *r; 912 913 /* Reset keep-alive send timer and server watchdog */ 914 client->cyclic_tick = 0; 915 client->server_watchdog = 0; 916 /* QoS 0 publish has no response from server, so call its callbacks here */ 917 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 918 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 919 if (r->cb != NULL) { 920 r->cb(r->arg, ERR_OK); 921 } 922 mqtt_delete_request(r); 923 } 924 /* Try send any remaining buffers from output queue */ 925 mqtt_output_send(&client->output, client->conn); 926 } 927 return ERR_OK; 928 } 929 930 /** 931 * TCP error callback function. @see tcp_err_fn 932 * @param arg MQTT client 933 * @param err Error encountered 934 */ 935 static void 936 mqtt_tcp_err_cb(void *arg, err_t err) 937 { 938 mqtt_client_t *client = (mqtt_client_t *)arg; 939 LWIP_UNUSED_ARG(err); /* only used for debug output */ 940 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 941 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 942 /* Set conn to null before calling close as pcb is already deallocated*/ 943 client->conn = 0; 944 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 945 } 946 947 /** 948 * TCP poll callback function. @see tcp_poll_fn 949 * @param arg MQTT client 950 * @param tpcb TCP connection handle 951 * @return err ERR_OK 952 */ 953 static err_t 954 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb) 955 { 956 mqtt_client_t *client = (mqtt_client_t *)arg; 957 if (client->conn_state == MQTT_CONNECTED) { 958 /* Try send any remaining buffers from output queue */ 959 mqtt_output_send(&client->output, tpcb); 960 } 961 return ERR_OK; 962 } 963 964 /** 965 * TCP connect callback function. @see tcp_connected_fn 966 * @param arg MQTT client 967 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 968 * @return ERR_OK 969 */ 970 static err_t 971 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err) 972 { 973 mqtt_client_t* client = (mqtt_client_t *)arg; 974 975 if (err != ERR_OK) { 976 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 977 return err; 978 } 979 980 /* Initiate receiver state */ 981 client->msg_idx = 0; 982 983 /* Setup TCP callbacks */ 984 tcp_recv(tpcb, mqtt_tcp_recv_cb); 985 tcp_sent(tpcb, mqtt_tcp_sent_cb); 986 tcp_poll(tpcb, mqtt_tcp_poll_cb, 2); 987 988 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n")); 989 /* Enter MQTT connect state */ 990 client->conn_state = MQTT_CONNECTING; 991 992 /* Start cyclic timer */ 993 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client); 994 client->cyclic_tick = 0; 995 996 /* Start transmission from output queue, connect message is the first one out*/ 997 mqtt_output_send(&client->output, client->conn); 998 999 return ERR_OK; 1000 } 1001 1002 1003 1004 /*---------------------------------------------------------------------------------------------------- */ 1005 /* Public API */ 1006 1007 1008 /** 1009 * @ingroup mqtt 1010 * MQTT publish function. 1011 * @param client MQTT client 1012 * @param topic Publish topic string 1013 * @param payload Data to publish (NULL is allowed) 1014 * @param payload_length: Length of payload (0 is allowed) 1015 * @param qos Quality of service, 0 1 or 2 1016 * @param retain MQTT retain flag 1017 * @param cb Callback to call when publish is complete or has timed out 1018 * @param arg User supplied argument to publish callback 1019 * @return ERR_OK if successful 1020 * ERR_CONN if client is disconnected 1021 * ERR_MEM if short on memory 1022 */ 1023 err_t 1024 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 1025 mqtt_request_cb_t cb, void *arg) 1026 { 1027 struct mqtt_request_t *r; 1028 u16_t pkt_id; 1029 size_t topic_strlen; 1030 size_t total_len; 1031 u16_t topic_len; 1032 u16_t remaining_length; 1033 1034 LWIP_ASSERT("mqtt_publish: client != NULL", client); 1035 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 1036 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 1037 1038 topic_strlen = strlen(topic); 1039 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1040 topic_len = (u16_t)topic_strlen; 1041 total_len = 2 + topic_len + payload_length; 1042 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1043 remaining_length = (u16_t)total_len; 1044 1045 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 1046 1047 if (qos > 0) { 1048 remaining_length += 2; 1049 /* Generate pkt_id id for QoS1 and 2 */ 1050 pkt_id = msg_generate_packet_id(client); 1051 } else { 1052 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 1053 pkt_id = 0; 1054 } 1055 1056 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 1057 if (r == NULL) { 1058 return ERR_MEM; 1059 } 1060 1061 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1062 mqtt_delete_request(r); 1063 return ERR_MEM; 1064 } 1065 /* Append fixed header */ 1066 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 1067 1068 /* Append Topic */ 1069 mqtt_output_append_string(&client->output, topic, topic_len); 1070 1071 /* Append packet if for QoS 1 and 2*/ 1072 if (qos > 0) { 1073 mqtt_output_append_u16(&client->output, pkt_id); 1074 } 1075 1076 /* Append optional publish payload */ 1077 if ((payload != NULL) && (payload_length > 0)) { 1078 mqtt_output_append_buf(&client->output, payload, payload_length); 1079 } 1080 1081 mqtt_append_request(&client->pend_req_queue, r); 1082 mqtt_output_send(&client->output, client->conn); 1083 return ERR_OK; 1084 } 1085 1086 1087 /** 1088 * @ingroup mqtt 1089 * MQTT subscribe/unsubscribe function. 1090 * @param client MQTT client 1091 * @param topic topic to subscribe to 1092 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 1093 * @param cb Callback to call when subscribe/unsubscribe reponse is received 1094 * @param arg User supplied argument to publish callback 1095 * @param sub 1 for subscribe, 0 for unsubscribe 1096 * @return ERR_OK if successful, @see err_t enum for other results 1097 */ 1098 err_t 1099 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 1100 { 1101 size_t topic_strlen; 1102 size_t total_len; 1103 u16_t topic_len; 1104 u16_t remaining_length; 1105 u16_t pkt_id; 1106 struct mqtt_request_t *r; 1107 1108 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 1109 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 1110 1111 topic_strlen = strlen(topic); 1112 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1113 topic_len = (u16_t)topic_strlen; 1114 /* Topic string, pkt_id, qos for subscribe */ 1115 total_len = topic_len + 2 + 2 + (sub != 0); 1116 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1117 remaining_length = (u16_t)total_len; 1118 1119 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 1120 if (client->conn_state == TCP_DISCONNECTED) { 1121 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 1122 return ERR_CONN; 1123 } 1124 1125 pkt_id = msg_generate_packet_id(client); 1126 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 1127 if (r == NULL) { 1128 return ERR_MEM; 1129 } 1130 1131 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1132 mqtt_delete_request(r); 1133 return ERR_MEM; 1134 } 1135 1136 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 1137 1138 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 1139 /* Packet id */ 1140 mqtt_output_append_u16(&client->output, pkt_id); 1141 /* Topic */ 1142 mqtt_output_append_string(&client->output, topic, topic_len); 1143 /* QoS */ 1144 if (sub != 0) { 1145 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 1146 } 1147 1148 mqtt_append_request(&client->pend_req_queue, r); 1149 mqtt_output_send(&client->output, client->conn); 1150 return ERR_OK; 1151 } 1152 1153 1154 /** 1155 * @ingroup mqtt 1156 * Set callback to handle incoming publish requests from server 1157 * @param client MQTT client 1158 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 1159 * @param data_cb Callback for each fragment of payload that arrives 1160 * @param arg User supplied argument to both callbacks 1161 */ 1162 void 1163 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 1164 mqtt_incoming_data_cb_t data_cb, void *arg) 1165 { 1166 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 1167 client->data_cb = data_cb; 1168 client->pub_cb = pub_cb; 1169 client->inpub_arg = arg; 1170 } 1171 1172 /** 1173 * @ingroup mqtt 1174 * Create a new MQTT client instance 1175 * @return Pointer to instance on success, NULL otherwise 1176 */ 1177 mqtt_client_t * 1178 mqtt_client_new(void) 1179 { 1180 mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t)); 1181 if (client != NULL) { 1182 memset(client, 0, sizeof(mqtt_client_t)); 1183 } 1184 return client; 1185 } 1186 1187 1188 /** 1189 * @ingroup mqtt 1190 * Connect to MQTT server 1191 * @param client MQTT client 1192 * @param ip_addr Server IP 1193 * @param port Server port 1194 * @param cb Connection state change callback 1195 * @param arg User supplied argument to connection callback 1196 * @param client_info Client identification and connection options 1197 * @return ERR_OK if successful, @see err_t enum for other results 1198 */ 1199 err_t 1200 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 1201 const struct mqtt_connect_client_info_t *client_info) 1202 { 1203 err_t err; 1204 size_t len; 1205 u16_t client_id_length; 1206 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 1207 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 1208 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 1209 1210 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 1211 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 1212 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 1213 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 1214 1215 if (client->conn_state != TCP_DISCONNECTED) { 1216 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n")); 1217 return ERR_ISCONN; 1218 } 1219 1220 /* Wipe clean */ 1221 memset(client, 0, sizeof(mqtt_client_t)); 1222 client->connect_arg = arg; 1223 client->connect_cb = cb; 1224 client->keep_alive = client_info->keep_alive; 1225 mqtt_init_requests(client->req_list); 1226 1227 /* Build connect message */ 1228 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 1229 flags |= MQTT_CONNECT_FLAG_WILL; 1230 flags |= (client_info->will_qos & 3) << 3; 1231 if (client_info->will_retain) { 1232 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 1233 } 1234 len = strlen(client_info->will_topic); 1235 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 1236 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); 1237 will_topic_len = (u8_t)len; 1238 len = strlen(client_info->will_msg); 1239 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 1240 will_msg_len = (u8_t)len; 1241 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 1242 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1243 remaining_length = (u16_t)len; 1244 } 1245 1246 /* Don't complicate things, always connect using clean session */ 1247 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 1248 1249 len = strlen(client_info->client_id); 1250 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 1251 client_id_length = (u16_t)len; 1252 len = remaining_length + 2 + client_id_length; 1253 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1254 remaining_length = (u16_t)len; 1255 1256 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1257 return ERR_MEM; 1258 } 1259 1260 client->conn = tcp_new(); 1261 if (client->conn == NULL) { 1262 return ERR_MEM; 1263 } 1264 1265 /* Set arg pointer for callbacks */ 1266 tcp_arg(client->conn, client); 1267 /* Any local address, pick random local port number */ 1268 err = tcp_bind(client->conn, IP_ADDR_ANY, 0); 1269 if (err != ERR_OK) { 1270 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 1271 goto tcp_fail; 1272 } 1273 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 1274 1275 /* Connect to server */ 1276 err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); 1277 if (err != ERR_OK) { 1278 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 1279 goto tcp_fail; 1280 } 1281 /* Set error callback */ 1282 tcp_err(client->conn, mqtt_tcp_err_cb); 1283 client->conn_state = TCP_CONNECTING; 1284 1285 /* Append fixed header */ 1286 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 1287 /* Append Protocol string */ 1288 mqtt_output_append_string(&client->output, "MQTT", 4); 1289 /* Append Protocol level */ 1290 mqtt_output_append_u8(&client->output, 4); 1291 /* Append connect flags */ 1292 mqtt_output_append_u8(&client->output, flags); 1293 /* Append keep-alive */ 1294 mqtt_output_append_u16(&client->output, client_info->keep_alive); 1295 /* Append client id */ 1296 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 1297 /* Append will message if used */ 1298 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { 1299 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 1300 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 1301 } 1302 return ERR_OK; 1303 1304 tcp_fail: 1305 tcp_abort(client->conn); 1306 client->conn = NULL; 1307 return err; 1308 } 1309 1310 1311 /** 1312 * @ingroup mqtt 1313 * Disconnect from MQTT server 1314 * @param client MQTT client 1315 */ 1316 void 1317 mqtt_disconnect(mqtt_client_t *client) 1318 { 1319 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 1320 /* If connection in not already closed */ 1321 if (client->conn_state != TCP_DISCONNECTED) { 1322 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 1323 client->conn_state = TCP_DISCONNECTED; 1324 mqtt_close(client, (mqtt_connection_status_t)0); 1325 } 1326 } 1327 1328 /** 1329 * @ingroup mqtt 1330 * Check connection with server 1331 * @param client MQTT client 1332 * @return 1 if connected to server, 0 otherwise 1333 */ 1334 u8_t 1335 mqtt_client_is_connected(mqtt_client_t *client) 1336 { 1337 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 1338 return client->conn_state == MQTT_CONNECTED; 1339 } 1340 1341 #endif /* LWIP_TCP && LWIP_CALLBACK_API */ 1342