1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10  *
11  * This software is licensed as described in the file COPYING, which
12  * you should have received as part of this distribution. The terms
13  * are also available at https://curl.haxx.se/docs/copyright.html.
14  *
15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16  * copies of the Software, and permit persons to whom the Software is
17  * furnished to do so, under the terms of the COPYING file.
18  *
19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20  * KIND, either express or implied.
21  *
22  ***************************************************************************/
23 
24 #include "curl_setup.h"
25 
26 #ifdef CURL_ENABLE_MQTT
27 
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43 
44 /* The last #include file should be: */
45 #include "memdebug.h"
46 
47 #define MQTT_MSG_CONNECT   0x10
48 #define MQTT_MSG_CONNACK   0x20
49 #define MQTT_MSG_PUBLISH   0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK    0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53 
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57 
58 /*
59  * Forward declarations.
60  */
61 
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66 
67 /*
68  * MQTT protocol handler.
69  */
70 
71 const struct Curl_handler Curl_handler_mqtt = {
72   "MQTT",                             /* scheme */
73   mqtt_setup_conn,                    /* setup_connection */
74   mqtt_do,                            /* do_it */
75   ZERO_NULL,                          /* done */
76   ZERO_NULL,                          /* do_more */
77   ZERO_NULL,                          /* connect_it */
78   ZERO_NULL,                          /* connecting */
79   mqtt_doing,                         /* doing */
80   ZERO_NULL,                          /* proto_getsock */
81   mqtt_getsock,                       /* doing_getsock */
82   ZERO_NULL,                          /* domore_getsock */
83   ZERO_NULL,                          /* perform_getsock */
84   ZERO_NULL,                          /* disconnect */
85   ZERO_NULL,                          /* readwrite */
86   ZERO_NULL,                          /* connection_check */
87   PORT_MQTT,                          /* defport */
88   CURLPROTO_MQTT,                     /* protocol */
89   PROTOPT_NONE                        /* flags */
90 };
91 
mqtt_setup_conn(struct connectdata * conn)92 static CURLcode mqtt_setup_conn(struct connectdata *conn)
93 {
94   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
95      during this request */
96   struct MQTT *mq;
97   struct Curl_easy *data = conn->data;
98   DEBUGASSERT(data->req.protop == NULL);
99 
100   mq = calloc(1, sizeof(struct MQTT));
101   if(!mq)
102     return CURLE_OUT_OF_MEMORY;
103   data->req.protop = mq;
104   return CURLE_OK;
105 }
106 
mqtt_send(struct connectdata * conn,char * buf,size_t len)107 static CURLcode mqtt_send(struct connectdata *conn,
108                           char *buf, size_t len)
109 {
110   CURLcode result = CURLE_OK;
111   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
112   struct Curl_easy *data = conn->data;
113   struct MQTT *mq = data->req.protop;
114   ssize_t n;
115   result = Curl_write(conn, sockfd, buf, len, &n);
116   if(!result && data->set.verbose)
117     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
118   if(len != (size_t)n) {
119     size_t nsend = len - n;
120     char *sendleftovers = Curl_memdup(&buf[n], nsend);
121     if(!sendleftovers)
122       return CURLE_OUT_OF_MEMORY;
123     mq->sendleftovers = sendleftovers;
124     mq->nsend = nsend;
125   }
126   return result;
127 }
128 
129 /* Generic function called by the multi interface to figure out what socket(s)
130    to wait for and for what actions during the DOING and PROTOCONNECT
131    states */
mqtt_getsock(struct connectdata * conn,curl_socket_t * sock)132 static int mqtt_getsock(struct connectdata *conn,
133                         curl_socket_t *sock)
134 {
135   sock[0] = conn->sock[FIRSTSOCKET];
136   return GETSOCK_READSOCK(FIRSTSOCKET);
137 }
138 
mqtt_connect(struct connectdata * conn)139 static CURLcode mqtt_connect(struct connectdata *conn)
140 {
141   CURLcode result = CURLE_OK;
142   const size_t client_id_offset = 14;
143   const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
144   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
145   const size_t curl_len = strlen("curl");
146   char packet[32] = {
147     MQTT_MSG_CONNECT,  /* packet type */
148     0x00,              /* remaining length */
149     0x00, 0x04,        /* protocol length */
150     'M','Q','T','T',   /* protocol name */
151     0x04,              /* protocol level */
152     0x02,              /* CONNECT flag: CleanSession */
153     0x00, 0x3c,        /* keep-alive 0 = disabled */
154     0x00, 0x00         /* payload1 length */
155   };
156   packet[1] = (packetlen - 2) & 0x7f;
157   packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
158 
159   result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
160                          MQTT_CLIENTID_LEN - curl_len + 1);
161   memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
162   infof(conn->data, "Using client id '%s'\n", client_id);
163   if(!result)
164     result = mqtt_send(conn, packet, packetlen);
165   return result;
166 }
167 
mqtt_disconnect(struct connectdata * conn)168 static CURLcode mqtt_disconnect(struct connectdata *conn)
169 {
170   CURLcode result = CURLE_OK;
171   result = mqtt_send(conn, (char *)"\xe0\x00", 2);
172   return result;
173 }
174 
mqtt_verify_connack(struct connectdata * conn)175 static CURLcode mqtt_verify_connack(struct connectdata *conn)
176 {
177   CURLcode result;
178   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
179   unsigned char readbuf[MQTT_CONNACK_LEN];
180   ssize_t nread;
181   struct Curl_easy *data = conn->data;
182 
183   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
184   if(result)
185     goto fail;
186 
187   if(data->set.verbose)
188     Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
189 
190   /* fixme */
191   if(nread < MQTT_CONNACK_LEN) {
192     result = CURLE_WEIRD_SERVER_REPLY;
193     goto fail;
194   }
195 
196   /* verify CONNACK */
197   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
198     failf(data, "Expected %02x%02x but got %02x%02x",
199           0x00, 0x00, readbuf[0], readbuf[1]);
200     result = CURLE_WEIRD_SERVER_REPLY;
201   }
202 
203 fail:
204   return result;
205 }
206 
mqtt_get_topic(struct connectdata * conn,char ** topic,size_t * topiclen)207 static CURLcode mqtt_get_topic(struct connectdata *conn,
208                                char **topic, size_t *topiclen)
209 {
210   CURLcode result = CURLE_OK;
211   char *path = conn->data->state.up.path;
212 
213   if(strlen(path) > 1) {
214     result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
215                             REJECT_NADA);
216   }
217   else {
218     failf(conn->data, "Error: No topic specified.");
219     result = CURLE_URL_MALFORMAT;
220   }
221   return result;
222 }
223 
224 
mqtt_encode_len(char * buf,size_t len)225 static int mqtt_encode_len(char *buf, size_t len)
226 {
227   unsigned char encoded;
228   int i;
229 
230   for(i = 0; (len > 0) && (i<4); i++) {
231     encoded = len % 0x80;
232     len /= 0x80;
233     if(len)
234       encoded |= 0x80;
235     buf[i] = encoded;
236   }
237 
238   return i;
239 }
240 
mqtt_subscribe(struct connectdata * conn)241 static CURLcode mqtt_subscribe(struct connectdata *conn)
242 {
243   CURLcode result = CURLE_OK;
244   char *topic = NULL;
245   size_t topiclen;
246   unsigned char *packet = NULL;
247   size_t packetlen;
248   char encodedsize[4];
249   size_t n;
250 
251   result = mqtt_get_topic(conn, &topic, &topiclen);
252   if(result)
253     goto fail;
254 
255   conn->proto.mqtt.packetid++;
256 
257   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
258                                + 2 bytes topic length + QoS byte */
259   n = mqtt_encode_len((char *)encodedsize, packetlen);
260   packetlen += n + 1; /* add one for the control packet type byte */
261 
262   packet = malloc(packetlen);
263   if(!packet) {
264     result = CURLE_OUT_OF_MEMORY;
265     goto fail;
266   }
267 
268   packet[0] = MQTT_MSG_SUBSCRIBE;
269   memcpy(&packet[1], encodedsize, n);
270   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
271   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
272   packet[3 + n] = (topiclen >> 8) & 0xff;
273   packet[4 + n ] = topiclen & 0xff;
274   memcpy(&packet[5 + n], topic, topiclen);
275   packet[5 + n + topiclen] = 0; /* QoS zero */
276 
277   result = mqtt_send(conn, (char *)packet, packetlen);
278 
279 fail:
280   free(topic);
281   free(packet);
282   return result;
283 }
284 
285 /*
286  * Called when the first byte was already read.
287  */
mqtt_verify_suback(struct connectdata * conn)288 static CURLcode mqtt_verify_suback(struct connectdata *conn)
289 {
290   CURLcode result;
291   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
292   unsigned char readbuf[MQTT_SUBACK_LEN];
293   ssize_t nread;
294   struct mqtt_conn *mqtt = &conn->proto.mqtt;
295 
296   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
297   if(result)
298     goto fail;
299 
300   if(conn->data->set.verbose)
301     Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
302 
303   /* fixme */
304   if(nread < MQTT_SUBACK_LEN) {
305     result = CURLE_WEIRD_SERVER_REPLY;
306     goto fail;
307   }
308 
309   /* verify SUBACK */
310   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
311      readbuf[1] != (mqtt->packetid & 0xff) ||
312      readbuf[2] != 0x00)
313     result = CURLE_WEIRD_SERVER_REPLY;
314 
315 fail:
316   return result;
317 }
318 
mqtt_publish(struct connectdata * conn)319 static CURLcode mqtt_publish(struct connectdata *conn)
320 {
321   CURLcode result;
322   char *payload = conn->data->set.postfields;
323   size_t payloadlen = (size_t)conn->data->set.postfieldsize;
324   char *topic = NULL;
325   size_t topiclen;
326   unsigned char *pkt = NULL;
327   size_t i = 0;
328   size_t remaininglength;
329   size_t encodelen;
330   char encodedbytes[4];
331 
332   result = mqtt_get_topic(conn, &topic, &topiclen);
333   if(result)
334     goto fail;
335 
336   remaininglength = payloadlen + 2 + topiclen;
337   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
338 
339   /* add the control byte and the encoded remaining length */
340   pkt = malloc(remaininglength + 1 + encodelen);
341   if(!pkt) {
342     result = CURLE_OUT_OF_MEMORY;
343     goto fail;
344   }
345 
346   /* assemble packet */
347   pkt[i++] = MQTT_MSG_PUBLISH;
348   memcpy(&pkt[i], encodedbytes, encodelen);
349   i += encodelen;
350   pkt[i++] = (topiclen >> 8) & 0xff;
351   pkt[i++] = (topiclen & 0xff);
352   memcpy(&pkt[i], topic, topiclen);
353   i += topiclen;
354   memcpy(&pkt[i], payload, payloadlen);
355   i += payloadlen;
356   result = mqtt_send(conn, (char *)pkt, i);
357 
358 fail:
359   free(pkt);
360   free(topic);
361   return result;
362 }
363 
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)364 static size_t mqtt_decode_len(unsigned char *buf,
365                               size_t buflen, size_t *lenbytes)
366 {
367   size_t len = 0;
368   size_t mult = 1;
369   size_t i;
370   unsigned char encoded = 128;
371 
372   for(i = 0; (i < buflen) && (encoded & 128); i++) {
373     encoded = buf[i];
374     len += (encoded & 127) * mult;
375     mult *= 128;
376   }
377 
378   if(lenbytes)
379     *lenbytes = i;
380 
381   return len;
382 }
383 
384 #ifdef CURLDEBUG
385 static const char *statenames[]={
386   "MQTT_FIRST",
387   "MQTT_REMAINING_LENGTH",
388   "MQTT_CONNACK",
389   "MQTT_SUBACK",
390   "MQTT_SUBACK_COMING",
391   "MQTT_PUBWAIT",
392   "MQTT_PUB_REMAIN",
393 
394   "NOT A STATE"
395 };
396 #endif
397 
398 /* The only way to change state */
mqstate(struct connectdata * conn,enum mqttstate state,enum mqttstate nextstate)399 static void mqstate(struct connectdata *conn,
400                     enum mqttstate state,
401                     enum mqttstate nextstate) /* used if state == FIRST */
402 {
403   struct mqtt_conn *mqtt = &conn->proto.mqtt;
404 #ifdef CURLDEBUG
405   infof(conn->data, "%s (from %s) (next is %s)\n",
406         statenames[state],
407         statenames[mqtt->state],
408         (state == MQTT_FIRST)? statenames[nextstate] : "");
409 #endif
410   mqtt->state = state;
411   if(state == MQTT_FIRST)
412     mqtt->nextstate = nextstate;
413 }
414 
415 
416 /* for the publish packet */
417 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
418 
mqtt_read_publish(struct connectdata * conn,bool * done)419 static CURLcode mqtt_read_publish(struct connectdata *conn,
420                                   bool *done)
421 {
422   CURLcode result = CURLE_OK;
423   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
424   ssize_t nread;
425   struct Curl_easy *data = conn->data;
426   unsigned char *pkt = (unsigned char *)data->state.buffer;
427   size_t remlen;
428   struct mqtt_conn *mqtt = &conn->proto.mqtt;
429   struct MQTT *mq = data->req.protop;
430   unsigned char packet;
431 
432   switch(mqtt->state) {
433   MQTT_SUBACK_COMING:
434   case MQTT_SUBACK_COMING:
435     result = mqtt_verify_suback(conn);
436     if(result)
437       break;
438 
439     mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
440     break;
441 
442   case MQTT_SUBACK:
443   case MQTT_PUBWAIT:
444     /* we are expecting PUBLISH or SUBACK */
445     packet = mq->firstbyte & 0xf0;
446     if(packet == MQTT_MSG_PUBLISH)
447       mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
448     else if(packet == MQTT_MSG_SUBACK) {
449       mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
450       goto MQTT_SUBACK_COMING;
451     }
452     else if(packet == MQTT_MSG_DISCONNECT) {
453       infof(data, "Got DISCONNECT\n");
454       *done = TRUE;
455       goto end;
456     }
457     else {
458       result = CURLE_WEIRD_SERVER_REPLY;
459       goto end;
460     }
461 
462     /* -- switched state -- */
463     remlen = mq->remaining_length;
464     infof(data, "Remaining length: %zd bytes\n", remlen);
465     Curl_pgrsSetDownloadSize(data, remlen);
466     data->req.bytecount = 0;
467     data->req.size = remlen;
468     mq->npacket = remlen; /* get this many bytes */
469     /* FALLTHROUGH */
470   case MQTT_PUB_REMAIN: {
471     /* read rest of packet, but no more. Cap to buffer size */
472     struct SingleRequest *k = &data->req;
473     size_t rest = mq->npacket;
474     if(rest > (size_t)data->set.buffer_size)
475       rest = (size_t)data->set.buffer_size;
476     result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
477     if(result) {
478       if(CURLE_AGAIN == result) {
479         infof(data, "EEEE AAAAGAIN\n");
480       }
481       goto end;
482     }
483     if(!nread) {
484       infof(data, "server disconnected\n");
485       result = CURLE_PARTIAL_FILE;
486       goto end;
487     }
488     if(data->set.verbose)
489       Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
490 
491     mq->npacket -= nread;
492     k->bytecount += nread;
493     Curl_pgrsSetDownloadCounter(data, k->bytecount);
494 
495     /* if QoS is set, message contains packet id */
496 
497     result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
498     if(result)
499       goto end;
500 
501     if(!mq->npacket)
502       /* no more PUBLISH payload, back to subscribe wait state */
503       mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
504     break;
505   }
506   default:
507     DEBUGASSERT(NULL); /* illegal state */
508     result = CURLE_WEIRD_SERVER_REPLY;
509     goto end;
510   }
511   end:
512   return result;
513 }
514 
mqtt_do(struct connectdata * conn,bool * done)515 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
516 {
517   CURLcode result = CURLE_OK;
518   struct Curl_easy *data = conn->data;
519 
520   *done = FALSE; /* unconditionally */
521 
522   result = mqtt_connect(conn);
523   if(result) {
524     failf(data, "Error %d sending MQTT CONN request", result);
525     return result;
526   }
527   mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
528   return CURLE_OK;
529 }
530 
mqtt_doing(struct connectdata * conn,bool * done)531 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
532 {
533   CURLcode result = CURLE_OK;
534   struct mqtt_conn *mqtt = &conn->proto.mqtt;
535   struct Curl_easy *data = conn->data;
536   struct MQTT *mq = data->req.protop;
537   ssize_t nread;
538   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
539   unsigned char *pkt = (unsigned char *)data->state.buffer;
540   unsigned char byte;
541 
542   *done = FALSE;
543 
544   if(mq->nsend) {
545     /* send the remainder of an outgoing packet */
546     char *ptr = mq->sendleftovers;
547     result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
548     free(ptr);
549     if(result)
550       return result;
551   }
552 
553   infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
554   switch(mqtt->state) {
555   case MQTT_FIRST:
556     /* Read the initial byte only */
557     result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
558     if(result)
559       break;
560     if(data->set.verbose)
561       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
562     /* remember the first byte */
563     mq->npacket = 0;
564     mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
565     /* FALLTHROUGH */
566   case MQTT_REMAINING_LENGTH:
567     do {
568       result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
569       if(result)
570         break;
571       if(data->set.verbose)
572         Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
573       pkt[mq->npacket++] = byte;
574     } while((byte & 0x80) && (mq->npacket < 4));
575     if(result)
576       break;
577     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
578     mq->npacket = 0;
579     if(mq->remaining_length) {
580       mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
581       break;
582     }
583     mqstate(conn, MQTT_FIRST, MQTT_FIRST);
584 
585     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
586       infof(data, "Got DISCONNECT\n");
587       *done = TRUE;
588     }
589     break;
590   case MQTT_CONNACK:
591     result = mqtt_verify_connack(conn);
592     if(result)
593       break;
594 
595     if(conn->data->state.httpreq == HTTPREQ_POST) {
596       result = mqtt_publish(conn);
597       if(!result) {
598         result = mqtt_disconnect(conn);
599         *done = TRUE;
600       }
601       mqtt->nextstate = MQTT_FIRST;
602     }
603     else {
604       result = mqtt_subscribe(conn);
605       if(!result) {
606         mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
607       }
608     }
609     break;
610 
611   case MQTT_SUBACK:
612   case MQTT_PUBWAIT:
613   case MQTT_PUB_REMAIN:
614     result = mqtt_read_publish(conn, done);
615     break;
616 
617   default:
618     failf(conn->data, "State not handled yet");
619     *done = TRUE;
620     break;
621   }
622 
623   if(result == CURLE_AGAIN)
624     result = CURLE_OK;
625   return result;
626 }
627 
628 #endif /* CURL_ENABLE_MQTT */
629