1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.haxx.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifdef CURL_ENABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT 0x10
48 #define MQTT_MSG_CONNACK 0x20
49 #define MQTT_MSG_PUBLISH 0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK 0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59 * Forward declarations.
60 */
61
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66
67 /*
68 * MQTT protocol handler.
69 */
70
71 const struct Curl_handler Curl_handler_mqtt = {
72 "MQTT", /* scheme */
73 mqtt_setup_conn, /* setup_connection */
74 mqtt_do, /* do_it */
75 ZERO_NULL, /* done */
76 ZERO_NULL, /* do_more */
77 ZERO_NULL, /* connect_it */
78 ZERO_NULL, /* connecting */
79 mqtt_doing, /* doing */
80 ZERO_NULL, /* proto_getsock */
81 mqtt_getsock, /* doing_getsock */
82 ZERO_NULL, /* domore_getsock */
83 ZERO_NULL, /* perform_getsock */
84 ZERO_NULL, /* disconnect */
85 ZERO_NULL, /* readwrite */
86 ZERO_NULL, /* connection_check */
87 PORT_MQTT, /* defport */
88 CURLPROTO_MQTT, /* protocol */
89 PROTOPT_NONE /* flags */
90 };
91
mqtt_setup_conn(struct connectdata * conn)92 static CURLcode mqtt_setup_conn(struct connectdata *conn)
93 {
94 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
95 during this request */
96 struct MQTT *mq;
97 struct Curl_easy *data = conn->data;
98 DEBUGASSERT(data->req.protop == NULL);
99
100 mq = calloc(1, sizeof(struct MQTT));
101 if(!mq)
102 return CURLE_OUT_OF_MEMORY;
103 data->req.protop = mq;
104 return CURLE_OK;
105 }
106
mqtt_send(struct connectdata * conn,char * buf,size_t len)107 static CURLcode mqtt_send(struct connectdata *conn,
108 char *buf, size_t len)
109 {
110 CURLcode result = CURLE_OK;
111 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
112 struct Curl_easy *data = conn->data;
113 struct MQTT *mq = data->req.protop;
114 ssize_t n;
115 result = Curl_write(conn, sockfd, buf, len, &n);
116 if(!result && data->set.verbose)
117 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
118 if(len != (size_t)n) {
119 size_t nsend = len - n;
120 char *sendleftovers = Curl_memdup(&buf[n], nsend);
121 if(!sendleftovers)
122 return CURLE_OUT_OF_MEMORY;
123 mq->sendleftovers = sendleftovers;
124 mq->nsend = nsend;
125 }
126 return result;
127 }
128
129 /* Generic function called by the multi interface to figure out what socket(s)
130 to wait for and for what actions during the DOING and PROTOCONNECT
131 states */
mqtt_getsock(struct connectdata * conn,curl_socket_t * sock)132 static int mqtt_getsock(struct connectdata *conn,
133 curl_socket_t *sock)
134 {
135 sock[0] = conn->sock[FIRSTSOCKET];
136 return GETSOCK_READSOCK(FIRSTSOCKET);
137 }
138
mqtt_connect(struct connectdata * conn)139 static CURLcode mqtt_connect(struct connectdata *conn)
140 {
141 CURLcode result = CURLE_OK;
142 const size_t client_id_offset = 14;
143 const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
144 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
145 const size_t curl_len = strlen("curl");
146 char packet[32] = {
147 MQTT_MSG_CONNECT, /* packet type */
148 0x00, /* remaining length */
149 0x00, 0x04, /* protocol length */
150 'M','Q','T','T', /* protocol name */
151 0x04, /* protocol level */
152 0x02, /* CONNECT flag: CleanSession */
153 0x00, 0x3c, /* keep-alive 0 = disabled */
154 0x00, 0x00 /* payload1 length */
155 };
156 packet[1] = (packetlen - 2) & 0x7f;
157 packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
158
159 result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
160 MQTT_CLIENTID_LEN - curl_len + 1);
161 memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
162 infof(conn->data, "Using client id '%s'\n", client_id);
163 if(!result)
164 result = mqtt_send(conn, packet, packetlen);
165 return result;
166 }
167
mqtt_disconnect(struct connectdata * conn)168 static CURLcode mqtt_disconnect(struct connectdata *conn)
169 {
170 CURLcode result = CURLE_OK;
171 result = mqtt_send(conn, (char *)"\xe0\x00", 2);
172 return result;
173 }
174
mqtt_verify_connack(struct connectdata * conn)175 static CURLcode mqtt_verify_connack(struct connectdata *conn)
176 {
177 CURLcode result;
178 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
179 unsigned char readbuf[MQTT_CONNACK_LEN];
180 ssize_t nread;
181 struct Curl_easy *data = conn->data;
182
183 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
184 if(result)
185 goto fail;
186
187 if(data->set.verbose)
188 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
189
190 /* fixme */
191 if(nread < MQTT_CONNACK_LEN) {
192 result = CURLE_WEIRD_SERVER_REPLY;
193 goto fail;
194 }
195
196 /* verify CONNACK */
197 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
198 failf(data, "Expected %02x%02x but got %02x%02x",
199 0x00, 0x00, readbuf[0], readbuf[1]);
200 result = CURLE_WEIRD_SERVER_REPLY;
201 }
202
203 fail:
204 return result;
205 }
206
mqtt_get_topic(struct connectdata * conn,char ** topic,size_t * topiclen)207 static CURLcode mqtt_get_topic(struct connectdata *conn,
208 char **topic, size_t *topiclen)
209 {
210 CURLcode result = CURLE_OK;
211 char *path = conn->data->state.up.path;
212
213 if(strlen(path) > 1) {
214 result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
215 REJECT_NADA);
216 }
217 else {
218 failf(conn->data, "Error: No topic specified.");
219 result = CURLE_URL_MALFORMAT;
220 }
221 return result;
222 }
223
224
mqtt_encode_len(char * buf,size_t len)225 static int mqtt_encode_len(char *buf, size_t len)
226 {
227 unsigned char encoded;
228 int i;
229
230 for(i = 0; (len > 0) && (i<4); i++) {
231 encoded = len % 0x80;
232 len /= 0x80;
233 if(len)
234 encoded |= 0x80;
235 buf[i] = encoded;
236 }
237
238 return i;
239 }
240
mqtt_subscribe(struct connectdata * conn)241 static CURLcode mqtt_subscribe(struct connectdata *conn)
242 {
243 CURLcode result = CURLE_OK;
244 char *topic = NULL;
245 size_t topiclen;
246 unsigned char *packet = NULL;
247 size_t packetlen;
248 char encodedsize[4];
249 size_t n;
250
251 result = mqtt_get_topic(conn, &topic, &topiclen);
252 if(result)
253 goto fail;
254
255 conn->proto.mqtt.packetid++;
256
257 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
258 + 2 bytes topic length + QoS byte */
259 n = mqtt_encode_len((char *)encodedsize, packetlen);
260 packetlen += n + 1; /* add one for the control packet type byte */
261
262 packet = malloc(packetlen);
263 if(!packet) {
264 result = CURLE_OUT_OF_MEMORY;
265 goto fail;
266 }
267
268 packet[0] = MQTT_MSG_SUBSCRIBE;
269 memcpy(&packet[1], encodedsize, n);
270 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
271 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
272 packet[3 + n] = (topiclen >> 8) & 0xff;
273 packet[4 + n ] = topiclen & 0xff;
274 memcpy(&packet[5 + n], topic, topiclen);
275 packet[5 + n + topiclen] = 0; /* QoS zero */
276
277 result = mqtt_send(conn, (char *)packet, packetlen);
278
279 fail:
280 free(topic);
281 free(packet);
282 return result;
283 }
284
285 /*
286 * Called when the first byte was already read.
287 */
mqtt_verify_suback(struct connectdata * conn)288 static CURLcode mqtt_verify_suback(struct connectdata *conn)
289 {
290 CURLcode result;
291 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
292 unsigned char readbuf[MQTT_SUBACK_LEN];
293 ssize_t nread;
294 struct mqtt_conn *mqtt = &conn->proto.mqtt;
295
296 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
297 if(result)
298 goto fail;
299
300 if(conn->data->set.verbose)
301 Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
302
303 /* fixme */
304 if(nread < MQTT_SUBACK_LEN) {
305 result = CURLE_WEIRD_SERVER_REPLY;
306 goto fail;
307 }
308
309 /* verify SUBACK */
310 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
311 readbuf[1] != (mqtt->packetid & 0xff) ||
312 readbuf[2] != 0x00)
313 result = CURLE_WEIRD_SERVER_REPLY;
314
315 fail:
316 return result;
317 }
318
mqtt_publish(struct connectdata * conn)319 static CURLcode mqtt_publish(struct connectdata *conn)
320 {
321 CURLcode result;
322 char *payload = conn->data->set.postfields;
323 size_t payloadlen = (size_t)conn->data->set.postfieldsize;
324 char *topic = NULL;
325 size_t topiclen;
326 unsigned char *pkt = NULL;
327 size_t i = 0;
328 size_t remaininglength;
329 size_t encodelen;
330 char encodedbytes[4];
331
332 result = mqtt_get_topic(conn, &topic, &topiclen);
333 if(result)
334 goto fail;
335
336 remaininglength = payloadlen + 2 + topiclen;
337 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
338
339 /* add the control byte and the encoded remaining length */
340 pkt = malloc(remaininglength + 1 + encodelen);
341 if(!pkt) {
342 result = CURLE_OUT_OF_MEMORY;
343 goto fail;
344 }
345
346 /* assemble packet */
347 pkt[i++] = MQTT_MSG_PUBLISH;
348 memcpy(&pkt[i], encodedbytes, encodelen);
349 i += encodelen;
350 pkt[i++] = (topiclen >> 8) & 0xff;
351 pkt[i++] = (topiclen & 0xff);
352 memcpy(&pkt[i], topic, topiclen);
353 i += topiclen;
354 memcpy(&pkt[i], payload, payloadlen);
355 i += payloadlen;
356 result = mqtt_send(conn, (char *)pkt, i);
357
358 fail:
359 free(pkt);
360 free(topic);
361 return result;
362 }
363
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)364 static size_t mqtt_decode_len(unsigned char *buf,
365 size_t buflen, size_t *lenbytes)
366 {
367 size_t len = 0;
368 size_t mult = 1;
369 size_t i;
370 unsigned char encoded = 128;
371
372 for(i = 0; (i < buflen) && (encoded & 128); i++) {
373 encoded = buf[i];
374 len += (encoded & 127) * mult;
375 mult *= 128;
376 }
377
378 if(lenbytes)
379 *lenbytes = i;
380
381 return len;
382 }
383
384 #ifdef CURLDEBUG
385 static const char *statenames[]={
386 "MQTT_FIRST",
387 "MQTT_REMAINING_LENGTH",
388 "MQTT_CONNACK",
389 "MQTT_SUBACK",
390 "MQTT_SUBACK_COMING",
391 "MQTT_PUBWAIT",
392 "MQTT_PUB_REMAIN",
393
394 "NOT A STATE"
395 };
396 #endif
397
398 /* The only way to change state */
mqstate(struct connectdata * conn,enum mqttstate state,enum mqttstate nextstate)399 static void mqstate(struct connectdata *conn,
400 enum mqttstate state,
401 enum mqttstate nextstate) /* used if state == FIRST */
402 {
403 struct mqtt_conn *mqtt = &conn->proto.mqtt;
404 #ifdef CURLDEBUG
405 infof(conn->data, "%s (from %s) (next is %s)\n",
406 statenames[state],
407 statenames[mqtt->state],
408 (state == MQTT_FIRST)? statenames[nextstate] : "");
409 #endif
410 mqtt->state = state;
411 if(state == MQTT_FIRST)
412 mqtt->nextstate = nextstate;
413 }
414
415
416 /* for the publish packet */
417 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
418
mqtt_read_publish(struct connectdata * conn,bool * done)419 static CURLcode mqtt_read_publish(struct connectdata *conn,
420 bool *done)
421 {
422 CURLcode result = CURLE_OK;
423 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
424 ssize_t nread;
425 struct Curl_easy *data = conn->data;
426 unsigned char *pkt = (unsigned char *)data->state.buffer;
427 size_t remlen;
428 struct mqtt_conn *mqtt = &conn->proto.mqtt;
429 struct MQTT *mq = data->req.protop;
430 unsigned char packet;
431
432 switch(mqtt->state) {
433 MQTT_SUBACK_COMING:
434 case MQTT_SUBACK_COMING:
435 result = mqtt_verify_suback(conn);
436 if(result)
437 break;
438
439 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
440 break;
441
442 case MQTT_SUBACK:
443 case MQTT_PUBWAIT:
444 /* we are expecting PUBLISH or SUBACK */
445 packet = mq->firstbyte & 0xf0;
446 if(packet == MQTT_MSG_PUBLISH)
447 mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
448 else if(packet == MQTT_MSG_SUBACK) {
449 mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
450 goto MQTT_SUBACK_COMING;
451 }
452 else if(packet == MQTT_MSG_DISCONNECT) {
453 infof(data, "Got DISCONNECT\n");
454 *done = TRUE;
455 goto end;
456 }
457 else {
458 result = CURLE_WEIRD_SERVER_REPLY;
459 goto end;
460 }
461
462 /* -- switched state -- */
463 remlen = mq->remaining_length;
464 infof(data, "Remaining length: %zd bytes\n", remlen);
465 Curl_pgrsSetDownloadSize(data, remlen);
466 data->req.bytecount = 0;
467 data->req.size = remlen;
468 mq->npacket = remlen; /* get this many bytes */
469 /* FALLTHROUGH */
470 case MQTT_PUB_REMAIN: {
471 /* read rest of packet, but no more. Cap to buffer size */
472 struct SingleRequest *k = &data->req;
473 size_t rest = mq->npacket;
474 if(rest > (size_t)data->set.buffer_size)
475 rest = (size_t)data->set.buffer_size;
476 result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
477 if(result) {
478 if(CURLE_AGAIN == result) {
479 infof(data, "EEEE AAAAGAIN\n");
480 }
481 goto end;
482 }
483 if(!nread) {
484 infof(data, "server disconnected\n");
485 result = CURLE_PARTIAL_FILE;
486 goto end;
487 }
488 if(data->set.verbose)
489 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
490
491 mq->npacket -= nread;
492 k->bytecount += nread;
493 Curl_pgrsSetDownloadCounter(data, k->bytecount);
494
495 /* if QoS is set, message contains packet id */
496
497 result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
498 if(result)
499 goto end;
500
501 if(!mq->npacket)
502 /* no more PUBLISH payload, back to subscribe wait state */
503 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
504 break;
505 }
506 default:
507 DEBUGASSERT(NULL); /* illegal state */
508 result = CURLE_WEIRD_SERVER_REPLY;
509 goto end;
510 }
511 end:
512 return result;
513 }
514
mqtt_do(struct connectdata * conn,bool * done)515 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
516 {
517 CURLcode result = CURLE_OK;
518 struct Curl_easy *data = conn->data;
519
520 *done = FALSE; /* unconditionally */
521
522 result = mqtt_connect(conn);
523 if(result) {
524 failf(data, "Error %d sending MQTT CONN request", result);
525 return result;
526 }
527 mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
528 return CURLE_OK;
529 }
530
mqtt_doing(struct connectdata * conn,bool * done)531 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
532 {
533 CURLcode result = CURLE_OK;
534 struct mqtt_conn *mqtt = &conn->proto.mqtt;
535 struct Curl_easy *data = conn->data;
536 struct MQTT *mq = data->req.protop;
537 ssize_t nread;
538 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
539 unsigned char *pkt = (unsigned char *)data->state.buffer;
540 unsigned char byte;
541
542 *done = FALSE;
543
544 if(mq->nsend) {
545 /* send the remainder of an outgoing packet */
546 char *ptr = mq->sendleftovers;
547 result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
548 free(ptr);
549 if(result)
550 return result;
551 }
552
553 infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
554 switch(mqtt->state) {
555 case MQTT_FIRST:
556 /* Read the initial byte only */
557 result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
558 if(result)
559 break;
560 if(data->set.verbose)
561 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
562 /* remember the first byte */
563 mq->npacket = 0;
564 mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
565 /* FALLTHROUGH */
566 case MQTT_REMAINING_LENGTH:
567 do {
568 result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
569 if(result)
570 break;
571 if(data->set.verbose)
572 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
573 pkt[mq->npacket++] = byte;
574 } while((byte & 0x80) && (mq->npacket < 4));
575 if(result)
576 break;
577 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
578 mq->npacket = 0;
579 if(mq->remaining_length) {
580 mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
581 break;
582 }
583 mqstate(conn, MQTT_FIRST, MQTT_FIRST);
584
585 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
586 infof(data, "Got DISCONNECT\n");
587 *done = TRUE;
588 }
589 break;
590 case MQTT_CONNACK:
591 result = mqtt_verify_connack(conn);
592 if(result)
593 break;
594
595 if(conn->data->state.httpreq == HTTPREQ_POST) {
596 result = mqtt_publish(conn);
597 if(!result) {
598 result = mqtt_disconnect(conn);
599 *done = TRUE;
600 }
601 mqtt->nextstate = MQTT_FIRST;
602 }
603 else {
604 result = mqtt_subscribe(conn);
605 if(!result) {
606 mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
607 }
608 }
609 break;
610
611 case MQTT_SUBACK:
612 case MQTT_PUBWAIT:
613 case MQTT_PUB_REMAIN:
614 result = mqtt_read_publish(conn, done);
615 break;
616
617 default:
618 failf(conn->data, "State not handled yet");
619 *done = TRUE;
620 break;
621 }
622
623 if(result == CURLE_AGAIN)
624 result = CURLE_OK;
625 return result;
626 }
627
628 #endif /* CURL_ENABLE_MQTT */
629