1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10  *
11  * This software is licensed as described in the file COPYING, which
12  * you should have received as part of this distribution. The terms
13  * are also available at https://curl.se/docs/copyright.html.
14  *
15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16  * copies of the Software, and permit persons to whom the Software is
17  * furnished to do so, under the terms of the COPYING file.
18  *
19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20  * KIND, either express or implied.
21  *
22  ***************************************************************************/
23 
24 #include "curl_setup.h"
25 
26 #ifndef CURL_DISABLE_MQTT
27 
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43 
44 /* The last #include file should be: */
45 #include "memdebug.h"
46 
47 #define MQTT_MSG_CONNECT   0x10
48 #define MQTT_MSG_CONNACK   0x20
49 #define MQTT_MSG_PUBLISH   0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK    0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53 
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57 
58 /*
59  * Forward declarations.
60  */
61 
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66 
67 /*
68  * MQTT protocol handler.
69  */
70 
71 const struct Curl_handler Curl_handler_mqtt = {
72   "MQTT",                             /* scheme */
73   mqtt_setup_conn,                    /* setup_connection */
74   mqtt_do,                            /* do_it */
75   ZERO_NULL,                          /* done */
76   ZERO_NULL,                          /* do_more */
77   ZERO_NULL,                          /* connect_it */
78   ZERO_NULL,                          /* connecting */
79   mqtt_doing,                         /* doing */
80   ZERO_NULL,                          /* proto_getsock */
81   mqtt_getsock,                       /* doing_getsock */
82   ZERO_NULL,                          /* domore_getsock */
83   ZERO_NULL,                          /* perform_getsock */
84   ZERO_NULL,                          /* disconnect */
85   ZERO_NULL,                          /* readwrite */
86   ZERO_NULL,                          /* connection_check */
87   PORT_MQTT,                          /* defport */
88   CURLPROTO_MQTT,                     /* protocol */
89   CURLPROTO_MQTT,                     /* family */
90   PROTOPT_NONE                        /* flags */
91 };
92 
mqtt_setup_conn(struct connectdata * conn)93 static CURLcode mqtt_setup_conn(struct connectdata *conn)
94 {
95   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
96      during this request */
97   struct MQTT *mq;
98   struct Curl_easy *data = conn->data;
99   DEBUGASSERT(data->req.p.mqtt == NULL);
100 
101   mq = calloc(1, sizeof(struct MQTT));
102   if(!mq)
103     return CURLE_OUT_OF_MEMORY;
104   data->req.p.mqtt = mq;
105   return CURLE_OK;
106 }
107 
mqtt_send(struct connectdata * conn,char * buf,size_t len)108 static CURLcode mqtt_send(struct connectdata *conn,
109                           char *buf, size_t len)
110 {
111   CURLcode result = CURLE_OK;
112   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
113   struct Curl_easy *data = conn->data;
114   struct MQTT *mq = data->req.p.mqtt;
115   ssize_t n;
116   result = Curl_write(conn, sockfd, buf, len, &n);
117   if(!result)
118     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
119   if(len != (size_t)n) {
120     size_t nsend = len - n;
121     char *sendleftovers = Curl_memdup(&buf[n], nsend);
122     if(!sendleftovers)
123       return CURLE_OUT_OF_MEMORY;
124     mq->sendleftovers = sendleftovers;
125     mq->nsend = nsend;
126   }
127   return result;
128 }
129 
130 /* Generic function called by the multi interface to figure out what socket(s)
131    to wait for and for what actions during the DOING and PROTOCONNECT
132    states */
mqtt_getsock(struct connectdata * conn,curl_socket_t * sock)133 static int mqtt_getsock(struct connectdata *conn,
134                         curl_socket_t *sock)
135 {
136   sock[0] = conn->sock[FIRSTSOCKET];
137   return GETSOCK_READSOCK(FIRSTSOCKET);
138 }
139 
mqtt_connect(struct connectdata * conn)140 static CURLcode mqtt_connect(struct connectdata *conn)
141 {
142   CURLcode result = CURLE_OK;
143   const size_t client_id_offset = 14;
144   const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
145   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
146   const size_t clen = strlen("curl");
147   char packet[32] = {
148     MQTT_MSG_CONNECT,  /* packet type */
149     0x00,              /* remaining length */
150     0x00, 0x04,        /* protocol length */
151     'M','Q','T','T',   /* protocol name */
152     0x04,              /* protocol level */
153     0x02,              /* CONNECT flag: CleanSession */
154     0x00, 0x3c,        /* keep-alive 0 = disabled */
155     0x00, 0x00         /* payload1 length */
156   };
157   packet[1] = (packetlen - 2) & 0x7f;
158   packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
159 
160   result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
161                          MQTT_CLIENTID_LEN - clen + 1);
162   memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
163   infof(conn->data, "Using client id '%s'\n", client_id);
164   if(!result)
165     result = mqtt_send(conn, packet, packetlen);
166   return result;
167 }
168 
mqtt_disconnect(struct connectdata * conn)169 static CURLcode mqtt_disconnect(struct connectdata *conn)
170 {
171   CURLcode result = CURLE_OK;
172   result = mqtt_send(conn, (char *)"\xe0\x00", 2);
173   return result;
174 }
175 
mqtt_verify_connack(struct connectdata * conn)176 static CURLcode mqtt_verify_connack(struct connectdata *conn)
177 {
178   CURLcode result;
179   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
180   unsigned char readbuf[MQTT_CONNACK_LEN];
181   ssize_t nread;
182   struct Curl_easy *data = conn->data;
183 
184   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
185   if(result)
186     goto fail;
187 
188   Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
189 
190   /* fixme */
191   if(nread < MQTT_CONNACK_LEN) {
192     result = CURLE_WEIRD_SERVER_REPLY;
193     goto fail;
194   }
195 
196   /* verify CONNACK */
197   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
198     failf(data, "Expected %02x%02x but got %02x%02x",
199           0x00, 0x00, readbuf[0], readbuf[1]);
200     result = CURLE_WEIRD_SERVER_REPLY;
201   }
202 
203 fail:
204   return result;
205 }
206 
mqtt_get_topic(struct connectdata * conn,char ** topic,size_t * topiclen)207 static CURLcode mqtt_get_topic(struct connectdata *conn,
208                                char **topic, size_t *topiclen)
209 {
210   CURLcode result = CURLE_OK;
211   char *path = conn->data->state.up.path;
212 
213   if(strlen(path) > 1) {
214     result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
215                             REJECT_NADA);
216   }
217   else {
218     failf(conn->data, "Error: No topic specified.");
219     result = CURLE_URL_MALFORMAT;
220   }
221   return result;
222 }
223 
224 
mqtt_encode_len(char * buf,size_t len)225 static int mqtt_encode_len(char *buf, size_t len)
226 {
227   unsigned char encoded;
228   int i;
229 
230   for(i = 0; (len > 0) && (i<4); i++) {
231     encoded = len % 0x80;
232     len /= 0x80;
233     if(len)
234       encoded |= 0x80;
235     buf[i] = encoded;
236   }
237 
238   return i;
239 }
240 
mqtt_subscribe(struct connectdata * conn)241 static CURLcode mqtt_subscribe(struct connectdata *conn)
242 {
243   CURLcode result = CURLE_OK;
244   char *topic = NULL;
245   size_t topiclen;
246   unsigned char *packet = NULL;
247   size_t packetlen;
248   char encodedsize[4];
249   size_t n;
250 
251   result = mqtt_get_topic(conn, &topic, &topiclen);
252   if(result)
253     goto fail;
254 
255   conn->proto.mqtt.packetid++;
256 
257   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
258                                + 2 bytes topic length + QoS byte */
259   n = mqtt_encode_len((char *)encodedsize, packetlen);
260   packetlen += n + 1; /* add one for the control packet type byte */
261 
262   packet = malloc(packetlen);
263   if(!packet) {
264     result = CURLE_OUT_OF_MEMORY;
265     goto fail;
266   }
267 
268   packet[0] = MQTT_MSG_SUBSCRIBE;
269   memcpy(&packet[1], encodedsize, n);
270   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
271   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
272   packet[3 + n] = (topiclen >> 8) & 0xff;
273   packet[4 + n ] = topiclen & 0xff;
274   memcpy(&packet[5 + n], topic, topiclen);
275   packet[5 + n + topiclen] = 0; /* QoS zero */
276 
277   result = mqtt_send(conn, (char *)packet, packetlen);
278 
279 fail:
280   free(topic);
281   free(packet);
282   return result;
283 }
284 
285 /*
286  * Called when the first byte was already read.
287  */
mqtt_verify_suback(struct connectdata * conn)288 static CURLcode mqtt_verify_suback(struct connectdata *conn)
289 {
290   CURLcode result;
291   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
292   unsigned char readbuf[MQTT_SUBACK_LEN];
293   ssize_t nread;
294   struct mqtt_conn *mqtt = &conn->proto.mqtt;
295 
296   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
297   if(result)
298     goto fail;
299 
300   Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
301 
302   /* fixme */
303   if(nread < MQTT_SUBACK_LEN) {
304     result = CURLE_WEIRD_SERVER_REPLY;
305     goto fail;
306   }
307 
308   /* verify SUBACK */
309   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
310      readbuf[1] != (mqtt->packetid & 0xff) ||
311      readbuf[2] != 0x00)
312     result = CURLE_WEIRD_SERVER_REPLY;
313 
314 fail:
315   return result;
316 }
317 
mqtt_publish(struct connectdata * conn)318 static CURLcode mqtt_publish(struct connectdata *conn)
319 {
320   CURLcode result;
321   char *payload = conn->data->set.postfields;
322   size_t payloadlen = (size_t)conn->data->set.postfieldsize;
323   char *topic = NULL;
324   size_t topiclen;
325   unsigned char *pkt = NULL;
326   size_t i = 0;
327   size_t remaininglength;
328   size_t encodelen;
329   char encodedbytes[4];
330 
331   result = mqtt_get_topic(conn, &topic, &topiclen);
332   if(result)
333     goto fail;
334 
335   remaininglength = payloadlen + 2 + topiclen;
336   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
337 
338   /* add the control byte and the encoded remaining length */
339   pkt = malloc(remaininglength + 1 + encodelen);
340   if(!pkt) {
341     result = CURLE_OUT_OF_MEMORY;
342     goto fail;
343   }
344 
345   /* assemble packet */
346   pkt[i++] = MQTT_MSG_PUBLISH;
347   memcpy(&pkt[i], encodedbytes, encodelen);
348   i += encodelen;
349   pkt[i++] = (topiclen >> 8) & 0xff;
350   pkt[i++] = (topiclen & 0xff);
351   memcpy(&pkt[i], topic, topiclen);
352   i += topiclen;
353   memcpy(&pkt[i], payload, payloadlen);
354   i += payloadlen;
355   result = mqtt_send(conn, (char *)pkt, i);
356 
357 fail:
358   free(pkt);
359   free(topic);
360   return result;
361 }
362 
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)363 static size_t mqtt_decode_len(unsigned char *buf,
364                               size_t buflen, size_t *lenbytes)
365 {
366   size_t len = 0;
367   size_t mult = 1;
368   size_t i;
369   unsigned char encoded = 128;
370 
371   for(i = 0; (i < buflen) && (encoded & 128); i++) {
372     encoded = buf[i];
373     len += (encoded & 127) * mult;
374     mult *= 128;
375   }
376 
377   if(lenbytes)
378     *lenbytes = i;
379 
380   return len;
381 }
382 
383 #ifdef CURLDEBUG
384 static const char *statenames[]={
385   "MQTT_FIRST",
386   "MQTT_REMAINING_LENGTH",
387   "MQTT_CONNACK",
388   "MQTT_SUBACK",
389   "MQTT_SUBACK_COMING",
390   "MQTT_PUBWAIT",
391   "MQTT_PUB_REMAIN",
392 
393   "NOT A STATE"
394 };
395 #endif
396 
397 /* The only way to change state */
mqstate(struct connectdata * conn,enum mqttstate state,enum mqttstate nextstate)398 static void mqstate(struct connectdata *conn,
399                     enum mqttstate state,
400                     enum mqttstate nextstate) /* used if state == FIRST */
401 {
402   struct mqtt_conn *mqtt = &conn->proto.mqtt;
403 #ifdef CURLDEBUG
404   infof(conn->data, "%s (from %s) (next is %s)\n",
405         statenames[state],
406         statenames[mqtt->state],
407         (state == MQTT_FIRST)? statenames[nextstate] : "");
408 #endif
409   mqtt->state = state;
410   if(state == MQTT_FIRST)
411     mqtt->nextstate = nextstate;
412 }
413 
414 
415 /* for the publish packet */
416 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
417 
mqtt_read_publish(struct connectdata * conn,bool * done)418 static CURLcode mqtt_read_publish(struct connectdata *conn,
419                                   bool *done)
420 {
421   CURLcode result = CURLE_OK;
422   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
423   ssize_t nread;
424   struct Curl_easy *data = conn->data;
425   unsigned char *pkt = (unsigned char *)data->state.buffer;
426   size_t remlen;
427   struct mqtt_conn *mqtt = &conn->proto.mqtt;
428   struct MQTT *mq = data->req.p.mqtt;
429   unsigned char packet;
430 
431   switch(mqtt->state) {
432   MQTT_SUBACK_COMING:
433   case MQTT_SUBACK_COMING:
434     result = mqtt_verify_suback(conn);
435     if(result)
436       break;
437 
438     mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
439     break;
440 
441   case MQTT_SUBACK:
442   case MQTT_PUBWAIT:
443     /* we are expecting PUBLISH or SUBACK */
444     packet = mq->firstbyte & 0xf0;
445     if(packet == MQTT_MSG_PUBLISH)
446       mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
447     else if(packet == MQTT_MSG_SUBACK) {
448       mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
449       goto MQTT_SUBACK_COMING;
450     }
451     else if(packet == MQTT_MSG_DISCONNECT) {
452       infof(data, "Got DISCONNECT\n");
453       *done = TRUE;
454       goto end;
455     }
456     else {
457       result = CURLE_WEIRD_SERVER_REPLY;
458       goto end;
459     }
460 
461     /* -- switched state -- */
462     remlen = mq->remaining_length;
463     infof(data, "Remaining length: %zd bytes\n", remlen);
464     Curl_pgrsSetDownloadSize(data, remlen);
465     data->req.bytecount = 0;
466     data->req.size = remlen;
467     mq->npacket = remlen; /* get this many bytes */
468     /* FALLTHROUGH */
469   case MQTT_PUB_REMAIN: {
470     /* read rest of packet, but no more. Cap to buffer size */
471     struct SingleRequest *k = &data->req;
472     size_t rest = mq->npacket;
473     if(rest > (size_t)data->set.buffer_size)
474       rest = (size_t)data->set.buffer_size;
475     result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
476     if(result) {
477       if(CURLE_AGAIN == result) {
478         infof(data, "EEEE AAAAGAIN\n");
479       }
480       goto end;
481     }
482     if(!nread) {
483       infof(data, "server disconnected\n");
484       result = CURLE_PARTIAL_FILE;
485       goto end;
486     }
487     Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
488 
489     mq->npacket -= nread;
490     k->bytecount += nread;
491     Curl_pgrsSetDownloadCounter(data, k->bytecount);
492 
493     /* if QoS is set, message contains packet id */
494 
495     result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
496     if(result)
497       goto end;
498 
499     if(!mq->npacket)
500       /* no more PUBLISH payload, back to subscribe wait state */
501       mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
502     break;
503   }
504   default:
505     DEBUGASSERT(NULL); /* illegal state */
506     result = CURLE_WEIRD_SERVER_REPLY;
507     goto end;
508   }
509   end:
510   return result;
511 }
512 
mqtt_do(struct connectdata * conn,bool * done)513 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
514 {
515   CURLcode result = CURLE_OK;
516   struct Curl_easy *data = conn->data;
517 
518   *done = FALSE; /* unconditionally */
519 
520   result = mqtt_connect(conn);
521   if(result) {
522     failf(data, "Error %d sending MQTT CONN request", result);
523     return result;
524   }
525   mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
526   return CURLE_OK;
527 }
528 
mqtt_doing(struct connectdata * conn,bool * done)529 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
530 {
531   CURLcode result = CURLE_OK;
532   struct mqtt_conn *mqtt = &conn->proto.mqtt;
533   struct Curl_easy *data = conn->data;
534   struct MQTT *mq = data->req.p.mqtt;
535   ssize_t nread;
536   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
537   unsigned char *pkt = (unsigned char *)data->state.buffer;
538   unsigned char byte;
539 
540   *done = FALSE;
541 
542   if(mq->nsend) {
543     /* send the remainder of an outgoing packet */
544     char *ptr = mq->sendleftovers;
545     result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
546     free(ptr);
547     if(result)
548       return result;
549   }
550 
551   infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
552   switch(mqtt->state) {
553   case MQTT_FIRST:
554     /* Read the initial byte only */
555     result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
556     if(result)
557       break;
558     Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
559     /* remember the first byte */
560     mq->npacket = 0;
561     mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
562     /* FALLTHROUGH */
563   case MQTT_REMAINING_LENGTH:
564     do {
565       result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
566       if(result)
567         break;
568       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
569       pkt[mq->npacket++] = byte;
570     } while((byte & 0x80) && (mq->npacket < 4));
571     if(result)
572       break;
573     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
574     mq->npacket = 0;
575     if(mq->remaining_length) {
576       mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
577       break;
578     }
579     mqstate(conn, MQTT_FIRST, MQTT_FIRST);
580 
581     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
582       infof(data, "Got DISCONNECT\n");
583       *done = TRUE;
584     }
585     break;
586   case MQTT_CONNACK:
587     result = mqtt_verify_connack(conn);
588     if(result)
589       break;
590 
591     if(conn->data->state.httpreq == HTTPREQ_POST) {
592       result = mqtt_publish(conn);
593       if(!result) {
594         result = mqtt_disconnect(conn);
595         *done = TRUE;
596       }
597       mqtt->nextstate = MQTT_FIRST;
598     }
599     else {
600       result = mqtt_subscribe(conn);
601       if(!result) {
602         mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
603       }
604     }
605     break;
606 
607   case MQTT_SUBACK:
608   case MQTT_PUBWAIT:
609   case MQTT_PUB_REMAIN:
610     result = mqtt_read_publish(conn, done);
611     break;
612 
613   default:
614     failf(conn->data, "State not handled yet");
615     *done = TRUE;
616     break;
617   }
618 
619   if(result == CURLE_AGAIN)
620     result = CURLE_OK;
621   return result;
622 }
623 
624 #endif /* CURL_DISABLE_MQTT */
625