xref: /minix/minix/lib/liblwip/dist/src/apps/mqtt/mqtt.c (revision fb9c64b2)
1 /**
2  * @file
3  * MQTT client
4  *
5  * @defgroup mqtt MQTT client
6  * @ingroup apps
7  * @verbinclude mqtt_client.txt
8  */
9 
10 /*
11  * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
12  * All rights reserved.
13  *
14  * Redistribution and use in source and binary forms, with or without modification,
15  * are permitted provided that the following conditions are met:
16  *
17  * 1. Redistributions of source code must retain the above copyright notice,
18  *    this list of conditions and the following disclaimer.
19  * 2. Redistributions in binary form must reproduce the above copyright notice,
20  *    this list of conditions and the following disclaimer in the documentation
21  *    and/or other materials provided with the distribution.
22  * 3. The name of the author may not be used to endorse or promote products
23  *    derived from this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34  * OF SUCH DAMAGE.
35  *
36  * This file is part of the lwIP TCP/IP stack
37  *
38  * Author: Erik Andersson <erian747@gmail.com>
39  *
40  *
41  * @todo:
42  * - Handle large outgoing payloads for PUBLISH messages
43  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44  * - Add support for legacy MQTT protocol version
45  *
46  * Please coordinate changes and requests with Erik Andersson
47  * Erik Andersson <erian747@gmail.com>
48  *
49  */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/timeouts.h"
52 #include "lwip/ip_addr.h"
53 #include "lwip/mem.h"
54 #include "lwip/err.h"
55 #include "lwip/pbuf.h"
56 #include "lwip/tcp.h"
57 #include <string.h>
58 
59 #if LWIP_TCP && LWIP_CALLBACK_API
60 
61 /**
62  * MQTT_DEBUG: Default is off.
63  */
64 #if !defined MQTT_DEBUG || defined __DOXYGEN__
65 #define MQTT_DEBUG                  LWIP_DBG_OFF
66 #endif
67 
68 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
69 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
70 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
71 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
72 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
73 
74 static void mqtt_cyclic_timer(void *arg);
75 
76 /**
77  * MQTT client connection states
78  */
79 enum {
80   TCP_DISCONNECTED,
81   TCP_CONNECTING,
82   MQTT_CONNECTING,
83   MQTT_CONNECTED
84 };
85 
86 /**
87  * MQTT control message types
88  */
89 enum mqtt_message_type {
90   MQTT_MSG_TYPE_CONNECT = 1,
91   MQTT_MSG_TYPE_CONNACK = 2,
92   MQTT_MSG_TYPE_PUBLISH = 3,
93   MQTT_MSG_TYPE_PUBACK = 4,
94   MQTT_MSG_TYPE_PUBREC = 5,
95   MQTT_MSG_TYPE_PUBREL = 6,
96   MQTT_MSG_TYPE_PUBCOMP = 7,
97   MQTT_MSG_TYPE_SUBSCRIBE = 8,
98   MQTT_MSG_TYPE_SUBACK = 9,
99   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
100   MQTT_MSG_TYPE_UNSUBACK = 11,
101   MQTT_MSG_TYPE_PINGREQ = 12,
102   MQTT_MSG_TYPE_PINGRESP = 13,
103   MQTT_MSG_TYPE_DISCONNECT = 14
104 };
105 
106 /** Helpers to extract control packet type and qos from first byte in fixed header */
107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
109 
110 /**
111  * MQTT connect flags, only used in CONNECT message
112  */
113 enum mqtt_connect_flag {
114   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
115   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
116   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
117   MQTT_CONNECT_FLAG_WILL = 1 << 2,
118   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
119 };
120 
121 
122 #if defined(LWIP_DEBUG)
123 static const char * const mqtt_message_type_str[15] =
124 {
125   "UNDEFINED",
126   "CONNECT",
127   "CONNACK",
128   "PUBLISH",
129   "PUBACK",
130   "PUBREC",
131   "PUBREL",
132   "PUBCOMP",
133   "SUBSCRIBE",
134   "SUBACK",
135   "UNSUBSCRIBE",
136   "UNSUBACK",
137   "PINGREQ",
138   "PINGRESP",
139   "DISCONNECT"
140 };
141 
142 /**
143  * Message type value to string
144  * @param msg_type see enum mqtt_message_type
145  *
146  * @return Control message type text string
147  */
148 static const char *
149 mqtt_msg_type_to_str(u8_t msg_type)
150 {
151   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
152     msg_type = 0;
153   }
154   return mqtt_message_type_str[msg_type];
155 }
156 
157 #endif
158 
159 
160 /**
161  * Generate MQTT packet identifier
162  * @param client MQTT client
163  * @return New packet identifier, range 1 to 65535
164  */
165 static u16_t
166 msg_generate_packet_id(mqtt_client_t *client)
167 {
168   client->pkt_id_seq++;
169   if (client->pkt_id_seq == 0) {
170     client->pkt_id_seq++;
171   }
172   return client->pkt_id_seq;
173 }
174 
175 /*--------------------------------------------------------------------------------------------------------------------- */
176 /* Output ring buffer */
177 
178 
179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
180 
181 /** Add single item to ring buffer */
182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
183 
184 /** Return number of bytes in ring buffer */
185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
186 
187 /** Return number of bytes free in ring buffer */
188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
189 
190 /** Return number of bytes possible to read without wrapping around */
191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
192 
193 /** Return pointer to ring buffer get position */
194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
195 
196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
197 
198 
199 /**
200  * Try send as many bytes as possible from output ring buffer
201  * @param rb Output ring buffer
202  * @param tpcb TCP connection handle
203  */
204 static void
205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
206 {
207   err_t err;
208   u8_t wrap = 0;
209   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
210   u16_t send_len = tcp_sndbuf(tpcb);
211   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
212 
213   if (send_len == 0 || ringbuf_lin_len == 0) {
214     return;
215   }
216 
217   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
218                                 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
219 
220   if (send_len > ringbuf_lin_len) {
221     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
222     send_len = ringbuf_lin_len;
223     /* Wrap around if more data in ring buffer after linear portion */
224     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
225   }
226   err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
227   if ((err == ERR_OK) && wrap) {
228     mqtt_ringbuf_advance_get_idx(rb, send_len);
229     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
230     send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
231     err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
232   }
233 
234   if (err == ERR_OK) {
235     mqtt_ringbuf_advance_get_idx(rb, send_len);
236     /* Flush */
237     tcp_output(tpcb);
238   } else {
239     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
240   }
241 }
242 
243 
244 
245 /*--------------------------------------------------------------------------------------------------------------------- */
246 /* Request queue */
247 
248 /**
249  * Create request item
250  * @param r_objs Pointer to request objects
251  * @param pkt_id Packet identifier of request
252  * @param cb Packet callback to call when requests lifetime ends
253  * @param arg Parameter following callback
254  * @return Request or NULL if failed to create
255  */
256 static struct mqtt_request_t *
257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
258 {
259   struct mqtt_request_t *r = NULL;
260   u8_t n;
261   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
262   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
263     /* Item point to itself if not in use */
264     if (r_objs[n].next == &r_objs[n]) {
265       r = &r_objs[n];
266       r->next = NULL;
267       r->cb = cb;
268       r->arg = arg;
269       r->pkt_id = pkt_id;
270       break;
271     }
272   }
273   return r;
274 }
275 
276 
277 /**
278  * Append request to pending request queue
279  * @param tail Pointer to request queue tail pointer
280  * @param r Request to append
281  */
282 static void
283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
284 {
285   struct mqtt_request_t *head = NULL;
286   s16_t time_before = 0;
287   struct mqtt_request_t *iter;
288 
289   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
290 
291   /* Iterate trough queue to find head, and count total timeout time */
292   for (iter = *tail; iter != NULL; iter = iter->next) {
293     time_before += iter->timeout_diff;
294     head = iter;
295   }
296 
297   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
298   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
299   if (head == NULL) {
300     *tail = r;
301   } else {
302     head->next = r;
303   }
304 }
305 
306 
307 /**
308  * Delete request item
309  * @param r Request item to delete
310  */
311 static void
312 mqtt_delete_request(struct mqtt_request_t *r)
313 {
314   if (r != NULL) {
315     r->next = r;
316   }
317 }
318 
319 /**
320  * Remove a request item with a specific packet identifier from request queue
321  * @param tail Pointer to request queue tail pointer
322  * @param pkt_id Packet identifier of request to take
323  * @return Request item if found, NULL if not
324  */
325 static struct mqtt_request_t *
326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
327 {
328   struct mqtt_request_t *iter = NULL, *prev = NULL;
329   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
330   /* Search all request for pkt_id */
331   for (iter = *tail; iter != NULL; iter = iter->next) {
332     if (iter->pkt_id == pkt_id) {
333       break;
334     }
335     prev = iter;
336   }
337 
338   /* If request was found */
339   if (iter != NULL) {
340     /* unchain */
341     if (prev == NULL) {
342       *tail= iter->next;
343     } else {
344       prev->next = iter->next;
345     }
346     /* If exists, add remaining timeout time for the request to next */
347     if (iter->next != NULL) {
348       iter->next->timeout_diff += iter->timeout_diff;
349     }
350     iter->next = NULL;
351   }
352   return iter;
353 }
354 
355 /**
356  * Handle requests timeout
357  * @param tail Pointer to request queue tail pointer
358  * @param t Time since last call in seconds
359  */
360 static void
361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
362 {
363   struct mqtt_request_t *r = *tail;
364   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
365   while (t > 0 && r != NULL) {
366     if (t >= r->timeout_diff) {
367       t -= (u8_t)r->timeout_diff;
368       /* Unchain */
369       *tail = r->next;
370       /* Notify upper layer about timeout */
371       if (r->cb != NULL) {
372         r->cb(r->arg, ERR_TIMEOUT);
373       }
374       mqtt_delete_request(r);
375       /* Tail might be be modified in callback, so re-read it in every iteration */
376       r = *(struct mqtt_request_t * const volatile *)tail;
377     } else {
378       r->timeout_diff -= t;
379       t = 0;
380     }
381   }
382 }
383 
384 /**
385  * Free all request items
386  * @param tail Pointer to request queue tail pointer
387  */
388 static void
389 mqtt_clear_requests(struct mqtt_request_t **tail)
390 {
391   struct mqtt_request_t *iter, *next;
392   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
393   for (iter = *tail; iter != NULL; iter = next) {
394     next = iter->next;
395     mqtt_delete_request(iter);
396   }
397   *tail = NULL;
398 }
399 /**
400  * Initialize all request items
401  * @param r_objs Pointer to request objects
402  */
403 static void
404 mqtt_init_requests(struct mqtt_request_t *r_objs)
405 {
406   u8_t n;
407   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
408   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
409     /* Item pointing to itself indicates unused */
410     r_objs[n].next = &r_objs[n];
411   }
412 }
413 
414 /*--------------------------------------------------------------------------------------------------------------------- */
415 /* Output message build helpers */
416 
417 
418 static void
419 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
420 {
421   mqtt_ringbuf_put(rb, value);
422 }
423 
424 static
425 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
426 {
427   mqtt_ringbuf_put(rb, value >> 8);
428   mqtt_ringbuf_put(rb, value & 0xff);
429 }
430 
431 static void
432 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
433 {
434   u16_t n;
435   for (n = 0; n < length; n++) {
436     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
437   }
438 }
439 
440 static void
441 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
442 {
443   u16_t n;
444   mqtt_ringbuf_put(rb, length >> 8);
445   mqtt_ringbuf_put(rb, length & 0xff);
446   for (n = 0; n < length; n++) {
447     mqtt_ringbuf_put(rb, str[n]);
448   }
449 }
450 
451 /**
452  * Append fixed header
453  * @param rb Output ring buffer
454  * @param msg_type see enum mqtt_message_type
455  * @param dup MQTT DUP flag
456  * @param qos MQTT QoS field
457  * @param retain MQTT retain flag
458  * @param r_length Remaining length after fixed header
459  */
460 
461 static void
462 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
463                  u8_t qos, u8_t retain, u16_t r_length)
464 {
465   /* Start with control byte */
466   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
467   /* Encode remaining length field */
468   do {
469     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
470     r_length >>= 7;
471   } while (r_length > 0);
472 }
473 
474 
475 /**
476  * Check output buffer space
477  * @param rb Output ring buffer
478  * @param r_length Remaining length after fixed header
479  * @return 1 if message will fit, 0 if not enough buffer space
480  */
481 static u8_t
482 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
483 {
484   /* Start with length of type byte + remaining length */
485   u16_t total_len = 1 + r_length;
486 
487   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
488 
489  /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
490   do {
491     total_len++;
492     r_length >>= 7;
493   } while (r_length > 0);
494 
495   return (total_len <= mqtt_ringbuf_free(rb));
496 }
497 
498 
499 /**
500  * Close connection to server
501  * @param client MQTT client
502  * @param reason Reason for disconnection
503  */
504 static void
505 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
506 {
507   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
508 
509   /* Bring down TCP connection if not already done */
510   if (client->conn != NULL) {
511     err_t res;
512     tcp_recv(client->conn, NULL);
513     tcp_err(client->conn,  NULL);
514     tcp_sent(client->conn, NULL);
515     res = tcp_close(client->conn);
516     if (res != ERR_OK) {
517       tcp_abort(client->conn);
518       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
519     }
520     client->conn = NULL;
521   }
522 
523   /* Remove all pending requests */
524   mqtt_clear_requests(&client->pend_req_queue);
525   /* Stop cyclic timer */
526   sys_untimeout(mqtt_cyclic_timer, client);
527 
528   /* Notify upper layer of disconnection if changed state */
529   if (client->conn_state != TCP_DISCONNECTED) {
530 
531     client->conn_state = TCP_DISCONNECTED;
532     if (client->connect_cb != NULL) {
533       client->connect_cb(client, client->connect_arg, reason);
534     }
535   }
536 }
537 
538 
539 /**
540  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
541  * @param arg MQTT client
542  */
543 static void
544 mqtt_cyclic_timer(void *arg)
545 {
546   u8_t restart_timer = 1;
547   mqtt_client_t *client = (mqtt_client_t *)arg;
548   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
549 
550   if (client->conn_state == MQTT_CONNECTING) {
551     client->cyclic_tick++;
552     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
553       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
554       /* Disconnect TCP */
555       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
556       restart_timer = 0;
557     }
558   } else if (client->conn_state == MQTT_CONNECTED) {
559     /* Handle timeout for pending requests */
560     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
561 
562     /* keep_alive > 0 means keep alive functionality shall be used */
563     if (client->keep_alive > 0) {
564 
565       client->server_watchdog++;
566       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
567       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
568         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
569         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
570         restart_timer = 0;
571       }
572 
573       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
574       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
575         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
576         if (mqtt_output_check_space(&client->output, 0) != 0) {
577           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
578           client->cyclic_tick = 0;
579         }
580       } else {
581         client->cyclic_tick++;
582       }
583     }
584   } else {
585     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
586     restart_timer = 0;
587   }
588   if (restart_timer) {
589     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
590   }
591 }
592 
593 
594 /**
595  * Send PUBACK, PUBREC or PUBREL response message
596  * @param client MQTT client
597  * @param msg PUBACK, PUBREC or PUBREL
598  * @param pkt_id Packet identifier
599  * @param qos QoS value
600  * @return ERR_OK if successful, ERR_MEM if out of memory
601  */
602 static err_t
603 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
604 {
605   err_t err = ERR_OK;
606   if (mqtt_output_check_space(&client->output, 2)) {
607     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
608     mqtt_output_append_u16(&client->output, pkt_id);
609     mqtt_output_send(&client->output, client->conn);
610   } else {
611     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
612                                   mqtt_msg_type_to_str(msg), pkt_id));
613     err = ERR_MEM;
614   }
615   return err;
616 }
617 
618 /**
619  * Subscribe response from server
620  * @param r Matching request
621  * @param result Result code from server
622  */
623 static void
624 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
625 {
626   if (r->cb != NULL) {
627     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
628   }
629 }
630 
631 
632 /**
633  * Complete MQTT message received or buffer full
634  * @param client MQTT client
635  * @param fixed_hdr_idx header index
636  * @param length length received part
637  * @param remaining_length Remaining length of complete message
638  */
639 static mqtt_connection_status_t
640   mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
641 {
642   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
643 
644   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
645 
646   /* Control packet type */
647   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
648   u16_t pkt_id = 0;
649 
650   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
651     if (client->conn_state == MQTT_CONNECTING) {
652       /* Get result code from CONNACK */
653       res = (mqtt_connection_status_t)var_hdr_payload[1];
654       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
655       if (res == MQTT_CONNECT_ACCEPTED) {
656         /* Reset cyclic_tick when changing to connected state */
657         client->cyclic_tick = 0;
658         client->conn_state = MQTT_CONNECTED;
659         /* Notify upper layer */
660         if (client->connect_cb != 0) {
661           client->connect_cb(client, client->connect_arg, res);
662         }
663       }
664     } else {
665       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
666     }
667   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
668     LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
669 
670   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
671     u16_t payload_offset = 0;
672     u16_t payload_length = length;
673     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
674 
675     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
676       /* Should have topic and pkt id*/
677       uint8_t *topic;
678       uint16_t after_topic;
679       u8_t bkp;
680       u16_t topic_len = var_hdr_payload[0];
681       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
682 
683       topic = var_hdr_payload + 2;
684       after_topic = 2 + topic_len;
685       /* Check length, add one byte even for QoS 0 so that zero termination will fit */
686       if ((after_topic + (qos? 2 : 1)) > length) {
687         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
688         goto out_disconnect;
689       }
690 
691       /* id for QoS 1 and 2 */
692       if (qos > 0) {
693         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
694         after_topic += 2;
695       } else {
696         client->inpub_pkt_id = 0;
697       }
698       /* Take backup of byte after topic */
699       bkp = topic[topic_len];
700       /* Zero terminate string */
701       topic[topic_len] = 0;
702       /* Payload data remaining in receive buffer */
703       payload_length = length - after_topic;
704       payload_offset = after_topic;
705 
706       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
707                                     qos, topic, remaining_length + payload_length));
708       if (client->pub_cb != NULL) {
709         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
710       }
711       /* Restore byte after topic */
712       topic[topic_len] = bkp;
713     }
714     if (payload_length > 0 || remaining_length == 0) {
715       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
716       /* Reply if QoS > 0 */
717       if (remaining_length == 0 && qos > 0) {
718         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
719         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
720         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
721                                       mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
722         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
723       }
724     }
725   } else {
726     /* Get packet identifier */
727     pkt_id = (u16_t)var_hdr_payload[0] << 8;
728     pkt_id |= (u16_t)var_hdr_payload[1];
729     if (pkt_id == 0) {
730       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
731       goto out_disconnect;
732     }
733     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
734       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
735       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
736 
737     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
738       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
739       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
740 
741     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
742               pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
743       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
744       if (r != NULL) {
745         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
746         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
747           if (length < 3) {
748             LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
749             goto out_disconnect;
750           } else {
751             mqtt_incomming_suback(r, var_hdr_payload[2]);
752           }
753         } else if (r->cb != NULL) {
754           r->cb(r->arg, ERR_OK);
755         }
756         mqtt_delete_request(r);
757       } else {
758         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
759       }
760     } else {
761       LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
762       goto out_disconnect;
763     }
764   }
765   return res;
766 out_disconnect:
767   return MQTT_CONNECT_DISCONNECTED;
768 }
769 
770 
771 /**
772  * MQTT incoming message parser
773  * @param client MQTT client
774  * @param p PBUF chain of received data
775  * @return Connection status
776  */
777 static mqtt_connection_status_t
778 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
779 {
780   u16_t in_offset = 0;
781   u32_t msg_rem_len = 0;
782   u8_t fixed_hdr_idx = 0;
783   u8_t b = 0;
784 
785   while (p->tot_len > in_offset) {
786     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
787 
788       if (fixed_hdr_idx < client->msg_idx) {
789         b = client->rx_buffer[fixed_hdr_idx];
790       } else {
791         b = pbuf_get_at(p, in_offset++);
792         client->rx_buffer[client->msg_idx++] = b;
793       }
794       fixed_hdr_idx++;
795 
796       if (fixed_hdr_idx >= 2) {
797         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
798         if ((b & 0x80) == 0) {
799           LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
800           if (msg_rem_len == 0) {
801             /* Complete message with no extra headers of payload received */
802             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
803             client->msg_idx = 0;
804             fixed_hdr_idx = 0;
805           } else {
806             /* Bytes remaining in message */
807             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
808           }
809         }
810       }
811     } else {
812       u16_t cpy_len, cpy_start, buffer_space;
813 
814       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
815 
816       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
817       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
818 
819       /* Limit to available space in buffer */
820       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
821       if (cpy_len > buffer_space) {
822         cpy_len = buffer_space;
823       }
824       pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
825 
826       /* Advance get and put indexes  */
827       client->msg_idx += cpy_len;
828       in_offset += cpy_len;
829       msg_rem_len -= cpy_len;
830 
831       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
832       if (msg_rem_len == 0 || cpy_len == buffer_space) {
833         /* Whole message received or buffer is full */
834         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
835         if (res != MQTT_CONNECT_ACCEPTED) {
836           return res;
837         }
838         if (msg_rem_len == 0) {
839           /* Reset parser state */
840           client->msg_idx = 0;
841           /* msg_tot_len = 0; */
842           fixed_hdr_idx = 0;
843         }
844       }
845     }
846   }
847   return MQTT_CONNECT_ACCEPTED;
848 }
849 
850 
851 /**
852  * TCP received callback function. @see tcp_recv_fn
853  * @param arg MQTT client
854  * @param p PBUF chain of received data
855  * @param err Passed as return value if not ERR_OK
856  * @return ERR_OK or err passed into callback
857  */
858 static err_t
859 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
860 {
861   mqtt_client_t *client = (mqtt_client_t *)arg;
862   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
863   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
864 
865   if (p == NULL) {
866     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
867     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
868   } else {
869     mqtt_connection_status_t res;
870     if (err != ERR_OK) {
871       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
872       pbuf_free(p);
873       return err;
874     }
875 
876     /* Tell remote that data has been received */
877     tcp_recved(pcb, p->tot_len);
878     res = mqtt_parse_incoming(client, p);
879     pbuf_free(p);
880 
881     if (res != MQTT_CONNECT_ACCEPTED) {
882       mqtt_close(client, res);
883     }
884     /* If keep alive functionality is used */
885     if (client->keep_alive != 0) {
886       /* Reset server alive watchdog */
887       client->server_watchdog = 0;
888     }
889 
890   }
891   return ERR_OK;
892 }
893 
894 
895 /**
896  * TCP data sent callback function. @see tcp_sent_fn
897  * @param arg MQTT client
898  * @param tpcb TCP connection handle
899  * @param len Number of bytes sent
900  * @return ERR_OK
901  */
902 static err_t
903 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
904 {
905   mqtt_client_t *client = (mqtt_client_t *)arg;
906 
907   LWIP_UNUSED_ARG(tpcb);
908   LWIP_UNUSED_ARG(len);
909 
910   if (client->conn_state == MQTT_CONNECTED) {
911     struct mqtt_request_t *r;
912 
913     /* Reset keep-alive send timer and server watchdog */
914     client->cyclic_tick = 0;
915     client->server_watchdog = 0;
916     /* QoS 0 publish has no response from server, so call its callbacks here */
917     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
918       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
919       if (r->cb != NULL) {
920         r->cb(r->arg, ERR_OK);
921       }
922       mqtt_delete_request(r);
923     }
924     /* Try send any remaining buffers from output queue */
925     mqtt_output_send(&client->output, client->conn);
926   }
927   return ERR_OK;
928 }
929 
930 /**
931  * TCP error callback function. @see tcp_err_fn
932  * @param arg MQTT client
933  * @param err Error encountered
934  */
935 static void
936 mqtt_tcp_err_cb(void *arg, err_t err)
937 {
938   mqtt_client_t *client = (mqtt_client_t *)arg;
939   LWIP_UNUSED_ARG(err); /* only used for debug output */
940   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
941   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
942   /* Set conn to null before calling close as pcb is already deallocated*/
943   client->conn = 0;
944   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
945 }
946 
947 /**
948  * TCP poll callback function. @see tcp_poll_fn
949  * @param arg MQTT client
950  * @param tpcb TCP connection handle
951  * @return err ERR_OK
952  */
953 static err_t
954 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
955 {
956   mqtt_client_t *client = (mqtt_client_t *)arg;
957   if (client->conn_state == MQTT_CONNECTED) {
958     /* Try send any remaining buffers from output queue */
959     mqtt_output_send(&client->output, tpcb);
960   }
961   return ERR_OK;
962 }
963 
964 /**
965  * TCP connect callback function. @see tcp_connected_fn
966  * @param arg MQTT client
967  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
968  * @return ERR_OK
969  */
970 static err_t
971 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
972 {
973   mqtt_client_t* client = (mqtt_client_t *)arg;
974 
975   if (err != ERR_OK) {
976     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
977     return err;
978   }
979 
980   /* Initiate receiver state */
981   client->msg_idx = 0;
982 
983   /* Setup TCP callbacks */
984   tcp_recv(tpcb, mqtt_tcp_recv_cb);
985   tcp_sent(tpcb, mqtt_tcp_sent_cb);
986   tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
987 
988   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
989   /* Enter MQTT connect state */
990   client->conn_state = MQTT_CONNECTING;
991 
992   /* Start cyclic timer */
993   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
994   client->cyclic_tick = 0;
995 
996   /* Start transmission from output queue, connect message is the first one out*/
997   mqtt_output_send(&client->output, client->conn);
998 
999   return ERR_OK;
1000 }
1001 
1002 
1003 
1004 /*---------------------------------------------------------------------------------------------------- */
1005 /* Public API */
1006 
1007 
1008 /**
1009  * @ingroup mqtt
1010  * MQTT publish function.
1011  * @param client MQTT client
1012  * @param topic Publish topic string
1013  * @param payload Data to publish (NULL is allowed)
1014  * @param payload_length: Length of payload (0 is allowed)
1015  * @param qos Quality of service, 0 1 or 2
1016  * @param retain MQTT retain flag
1017  * @param cb Callback to call when publish is complete or has timed out
1018  * @param arg User supplied argument to publish callback
1019  * @return ERR_OK if successful
1020  *         ERR_CONN if client is disconnected
1021  *         ERR_MEM if short on memory
1022  */
1023 err_t
1024 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1025              mqtt_request_cb_t cb, void *arg)
1026 {
1027   struct mqtt_request_t *r;
1028   u16_t pkt_id;
1029   size_t topic_strlen;
1030   size_t total_len;
1031   u16_t topic_len;
1032   u16_t remaining_length;
1033 
1034   LWIP_ASSERT("mqtt_publish: client != NULL", client);
1035   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1036   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1037 
1038   topic_strlen = strlen(topic);
1039   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1040   topic_len = (u16_t)topic_strlen;
1041   total_len = 2 + topic_len + payload_length;
1042   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1043   remaining_length = (u16_t)total_len;
1044 
1045   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1046 
1047   if (qos > 0) {
1048     remaining_length += 2;
1049     /* Generate pkt_id id for QoS1 and 2 */
1050     pkt_id = msg_generate_packet_id(client);
1051   } else {
1052     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1053     pkt_id = 0;
1054   }
1055 
1056   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1057   if (r == NULL) {
1058     return ERR_MEM;
1059   }
1060 
1061   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1062     mqtt_delete_request(r);
1063     return ERR_MEM;
1064   }
1065   /* Append fixed header */
1066   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1067 
1068   /* Append Topic */
1069   mqtt_output_append_string(&client->output, topic, topic_len);
1070 
1071   /* Append packet if for QoS 1 and 2*/
1072   if (qos > 0) {
1073     mqtt_output_append_u16(&client->output, pkt_id);
1074   }
1075 
1076   /* Append optional publish payload */
1077   if ((payload != NULL) && (payload_length > 0)) {
1078     mqtt_output_append_buf(&client->output, payload, payload_length);
1079   }
1080 
1081   mqtt_append_request(&client->pend_req_queue, r);
1082   mqtt_output_send(&client->output, client->conn);
1083   return ERR_OK;
1084 }
1085 
1086 
1087 /**
1088  * @ingroup mqtt
1089  * MQTT subscribe/unsubscribe function.
1090  * @param client MQTT client
1091  * @param topic topic to subscribe to
1092  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1093  * @param cb Callback to call when subscribe/unsubscribe reponse is received
1094  * @param arg User supplied argument to publish callback
1095  * @param sub 1 for subscribe, 0 for unsubscribe
1096  * @return ERR_OK if successful, @see err_t enum for other results
1097  */
1098 err_t
1099 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1100 {
1101   size_t topic_strlen;
1102   size_t total_len;
1103   u16_t topic_len;
1104   u16_t remaining_length;
1105   u16_t pkt_id;
1106   struct mqtt_request_t *r;
1107 
1108   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1109   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1110 
1111   topic_strlen = strlen(topic);
1112   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1113   topic_len = (u16_t)topic_strlen;
1114   /* Topic string, pkt_id, qos for subscribe */
1115   total_len =  topic_len + 2 + 2 + (sub != 0);
1116   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1117   remaining_length = (u16_t)total_len;
1118 
1119   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1120   if (client->conn_state == TCP_DISCONNECTED) {
1121     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1122     return ERR_CONN;
1123   }
1124 
1125   pkt_id = msg_generate_packet_id(client);
1126   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1127   if (r == NULL) {
1128     return ERR_MEM;
1129   }
1130 
1131   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1132     mqtt_delete_request(r);
1133     return ERR_MEM;
1134   }
1135 
1136   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1137 
1138   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1139   /* Packet id */
1140   mqtt_output_append_u16(&client->output, pkt_id);
1141   /* Topic */
1142   mqtt_output_append_string(&client->output, topic, topic_len);
1143   /* QoS */
1144   if (sub != 0) {
1145     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1146   }
1147 
1148   mqtt_append_request(&client->pend_req_queue, r);
1149   mqtt_output_send(&client->output, client->conn);
1150   return ERR_OK;
1151 }
1152 
1153 
1154 /**
1155  * @ingroup mqtt
1156  * Set callback to handle incoming publish requests from server
1157  * @param client MQTT client
1158  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1159  * @param data_cb Callback for each fragment of payload that arrives
1160  * @param arg User supplied argument to both callbacks
1161  */
1162 void
1163 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1164                              mqtt_incoming_data_cb_t data_cb, void *arg)
1165 {
1166   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1167   client->data_cb = data_cb;
1168   client->pub_cb = pub_cb;
1169   client->inpub_arg = arg;
1170 }
1171 
1172 /**
1173  * @ingroup mqtt
1174  * Create a new MQTT client instance
1175  * @return Pointer to instance on success, NULL otherwise
1176  */
1177 mqtt_client_t *
1178 mqtt_client_new(void)
1179 {
1180   mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
1181   if (client != NULL) {
1182     memset(client, 0, sizeof(mqtt_client_t));
1183   }
1184   return client;
1185 }
1186 
1187 
1188 /**
1189  * @ingroup mqtt
1190  * Connect to MQTT server
1191  * @param client MQTT client
1192  * @param ip_addr Server IP
1193  * @param port Server port
1194  * @param cb Connection state change callback
1195  * @param arg User supplied argument to connection callback
1196  * @param client_info Client identification and connection options
1197  * @return ERR_OK if successful, @see err_t enum for other results
1198  */
1199 err_t
1200 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1201                     const struct mqtt_connect_client_info_t *client_info)
1202 {
1203   err_t err;
1204   size_t len;
1205   u16_t client_id_length;
1206   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1207   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1208   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1209 
1210   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1211   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1212   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1213   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1214 
1215   if (client->conn_state != TCP_DISCONNECTED) {
1216     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
1217     return ERR_ISCONN;
1218   }
1219 
1220   /* Wipe clean */
1221   memset(client, 0, sizeof(mqtt_client_t));
1222   client->connect_arg = arg;
1223   client->connect_cb = cb;
1224   client->keep_alive = client_info->keep_alive;
1225   mqtt_init_requests(client->req_list);
1226 
1227   /* Build connect message */
1228   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1229     flags |= MQTT_CONNECT_FLAG_WILL;
1230     flags |= (client_info->will_qos & 3) << 3;
1231     if (client_info->will_retain) {
1232       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1233     }
1234     len = strlen(client_info->will_topic);
1235     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1236     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1237     will_topic_len = (u8_t)len;
1238     len = strlen(client_info->will_msg);
1239     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1240     will_msg_len = (u8_t)len;
1241     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1242     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1243     remaining_length = (u16_t)len;
1244   }
1245 
1246   /* Don't complicate things, always connect using clean session */
1247   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1248 
1249   len = strlen(client_info->client_id);
1250   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1251   client_id_length = (u16_t)len;
1252   len = remaining_length + 2 + client_id_length;
1253   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1254   remaining_length = (u16_t)len;
1255 
1256   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1257     return ERR_MEM;
1258   }
1259 
1260   client->conn = tcp_new();
1261   if (client->conn == NULL) {
1262     return ERR_MEM;
1263   }
1264 
1265   /* Set arg pointer for callbacks */
1266   tcp_arg(client->conn, client);
1267   /* Any local address, pick random local port number */
1268   err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
1269   if (err != ERR_OK) {
1270     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1271     goto tcp_fail;
1272   }
1273   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1274 
1275   /* Connect to server */
1276   err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1277   if (err != ERR_OK) {
1278     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1279     goto tcp_fail;
1280   }
1281   /* Set error callback */
1282   tcp_err(client->conn, mqtt_tcp_err_cb);
1283   client->conn_state = TCP_CONNECTING;
1284 
1285   /* Append fixed header */
1286   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1287   /* Append Protocol string */
1288   mqtt_output_append_string(&client->output, "MQTT", 4);
1289   /* Append Protocol level */
1290   mqtt_output_append_u8(&client->output, 4);
1291   /* Append connect flags */
1292   mqtt_output_append_u8(&client->output, flags);
1293   /* Append keep-alive */
1294   mqtt_output_append_u16(&client->output, client_info->keep_alive);
1295   /* Append client id */
1296   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1297   /* Append will message if used */
1298   if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1299     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1300     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1301   }
1302   return ERR_OK;
1303 
1304 tcp_fail:
1305   tcp_abort(client->conn);
1306   client->conn = NULL;
1307   return err;
1308 }
1309 
1310 
1311 /**
1312  * @ingroup mqtt
1313  * Disconnect from MQTT server
1314  * @param client MQTT client
1315  */
1316 void
1317 mqtt_disconnect(mqtt_client_t *client)
1318 {
1319   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1320   /* If connection in not already closed */
1321   if (client->conn_state != TCP_DISCONNECTED) {
1322     /* Set conn_state before calling mqtt_close to prevent callback from being called */
1323     client->conn_state = TCP_DISCONNECTED;
1324     mqtt_close(client, (mqtt_connection_status_t)0);
1325   }
1326 }
1327 
1328 /**
1329  * @ingroup mqtt
1330  * Check connection with server
1331  * @param client MQTT client
1332  * @return 1 if connected to server, 0 otherwise
1333  */
1334 u8_t
1335 mqtt_client_is_connected(mqtt_client_t *client)
1336 {
1337   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1338   return client->conn_state == MQTT_CONNECTED;
1339 }
1340 
1341 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
1342