1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10 *
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.se/docs/copyright.html.
14 *
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
18 *
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
21 *
22 ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifndef CURL_DISABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT 0x10
48 #define MQTT_MSG_CONNACK 0x20
49 #define MQTT_MSG_PUBLISH 0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK 0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59 * Forward declarations.
60 */
61
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66
67 /*
68 * MQTT protocol handler.
69 */
70
71 const struct Curl_handler Curl_handler_mqtt = {
72 "MQTT", /* scheme */
73 mqtt_setup_conn, /* setup_connection */
74 mqtt_do, /* do_it */
75 ZERO_NULL, /* done */
76 ZERO_NULL, /* do_more */
77 ZERO_NULL, /* connect_it */
78 ZERO_NULL, /* connecting */
79 mqtt_doing, /* doing */
80 ZERO_NULL, /* proto_getsock */
81 mqtt_getsock, /* doing_getsock */
82 ZERO_NULL, /* domore_getsock */
83 ZERO_NULL, /* perform_getsock */
84 ZERO_NULL, /* disconnect */
85 ZERO_NULL, /* readwrite */
86 ZERO_NULL, /* connection_check */
87 PORT_MQTT, /* defport */
88 CURLPROTO_MQTT, /* protocol */
89 CURLPROTO_MQTT, /* family */
90 PROTOPT_NONE /* flags */
91 };
92
mqtt_setup_conn(struct connectdata * conn)93 static CURLcode mqtt_setup_conn(struct connectdata *conn)
94 {
95 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
96 during this request */
97 struct MQTT *mq;
98 struct Curl_easy *data = conn->data;
99 DEBUGASSERT(data->req.p.mqtt == NULL);
100
101 mq = calloc(1, sizeof(struct MQTT));
102 if(!mq)
103 return CURLE_OUT_OF_MEMORY;
104 data->req.p.mqtt = mq;
105 return CURLE_OK;
106 }
107
mqtt_send(struct connectdata * conn,char * buf,size_t len)108 static CURLcode mqtt_send(struct connectdata *conn,
109 char *buf, size_t len)
110 {
111 CURLcode result = CURLE_OK;
112 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
113 struct Curl_easy *data = conn->data;
114 struct MQTT *mq = data->req.p.mqtt;
115 ssize_t n;
116 result = Curl_write(conn, sockfd, buf, len, &n);
117 if(!result)
118 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
119 if(len != (size_t)n) {
120 size_t nsend = len - n;
121 char *sendleftovers = Curl_memdup(&buf[n], nsend);
122 if(!sendleftovers)
123 return CURLE_OUT_OF_MEMORY;
124 mq->sendleftovers = sendleftovers;
125 mq->nsend = nsend;
126 }
127 return result;
128 }
129
130 /* Generic function called by the multi interface to figure out what socket(s)
131 to wait for and for what actions during the DOING and PROTOCONNECT
132 states */
mqtt_getsock(struct connectdata * conn,curl_socket_t * sock)133 static int mqtt_getsock(struct connectdata *conn,
134 curl_socket_t *sock)
135 {
136 sock[0] = conn->sock[FIRSTSOCKET];
137 return GETSOCK_READSOCK(FIRSTSOCKET);
138 }
139
mqtt_connect(struct connectdata * conn)140 static CURLcode mqtt_connect(struct connectdata *conn)
141 {
142 CURLcode result = CURLE_OK;
143 const size_t client_id_offset = 14;
144 const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
145 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
146 const size_t clen = strlen("curl");
147 char packet[32] = {
148 MQTT_MSG_CONNECT, /* packet type */
149 0x00, /* remaining length */
150 0x00, 0x04, /* protocol length */
151 'M','Q','T','T', /* protocol name */
152 0x04, /* protocol level */
153 0x02, /* CONNECT flag: CleanSession */
154 0x00, 0x3c, /* keep-alive 0 = disabled */
155 0x00, 0x00 /* payload1 length */
156 };
157 packet[1] = (packetlen - 2) & 0x7f;
158 packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
159
160 result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
161 MQTT_CLIENTID_LEN - clen + 1);
162 memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
163 infof(conn->data, "Using client id '%s'\n", client_id);
164 if(!result)
165 result = mqtt_send(conn, packet, packetlen);
166 return result;
167 }
168
mqtt_disconnect(struct connectdata * conn)169 static CURLcode mqtt_disconnect(struct connectdata *conn)
170 {
171 CURLcode result = CURLE_OK;
172 result = mqtt_send(conn, (char *)"\xe0\x00", 2);
173 return result;
174 }
175
mqtt_verify_connack(struct connectdata * conn)176 static CURLcode mqtt_verify_connack(struct connectdata *conn)
177 {
178 CURLcode result;
179 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
180 unsigned char readbuf[MQTT_CONNACK_LEN];
181 ssize_t nread;
182 struct Curl_easy *data = conn->data;
183
184 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
185 if(result)
186 goto fail;
187
188 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
189
190 /* fixme */
191 if(nread < MQTT_CONNACK_LEN) {
192 result = CURLE_WEIRD_SERVER_REPLY;
193 goto fail;
194 }
195
196 /* verify CONNACK */
197 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
198 failf(data, "Expected %02x%02x but got %02x%02x",
199 0x00, 0x00, readbuf[0], readbuf[1]);
200 result = CURLE_WEIRD_SERVER_REPLY;
201 }
202
203 fail:
204 return result;
205 }
206
mqtt_get_topic(struct connectdata * conn,char ** topic,size_t * topiclen)207 static CURLcode mqtt_get_topic(struct connectdata *conn,
208 char **topic, size_t *topiclen)
209 {
210 CURLcode result = CURLE_OK;
211 char *path = conn->data->state.up.path;
212
213 if(strlen(path) > 1) {
214 result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
215 REJECT_NADA);
216 }
217 else {
218 failf(conn->data, "Error: No topic specified.");
219 result = CURLE_URL_MALFORMAT;
220 }
221 return result;
222 }
223
224
mqtt_encode_len(char * buf,size_t len)225 static int mqtt_encode_len(char *buf, size_t len)
226 {
227 unsigned char encoded;
228 int i;
229
230 for(i = 0; (len > 0) && (i<4); i++) {
231 encoded = len % 0x80;
232 len /= 0x80;
233 if(len)
234 encoded |= 0x80;
235 buf[i] = encoded;
236 }
237
238 return i;
239 }
240
mqtt_subscribe(struct connectdata * conn)241 static CURLcode mqtt_subscribe(struct connectdata *conn)
242 {
243 CURLcode result = CURLE_OK;
244 char *topic = NULL;
245 size_t topiclen;
246 unsigned char *packet = NULL;
247 size_t packetlen;
248 char encodedsize[4];
249 size_t n;
250
251 result = mqtt_get_topic(conn, &topic, &topiclen);
252 if(result)
253 goto fail;
254
255 conn->proto.mqtt.packetid++;
256
257 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
258 + 2 bytes topic length + QoS byte */
259 n = mqtt_encode_len((char *)encodedsize, packetlen);
260 packetlen += n + 1; /* add one for the control packet type byte */
261
262 packet = malloc(packetlen);
263 if(!packet) {
264 result = CURLE_OUT_OF_MEMORY;
265 goto fail;
266 }
267
268 packet[0] = MQTT_MSG_SUBSCRIBE;
269 memcpy(&packet[1], encodedsize, n);
270 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
271 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
272 packet[3 + n] = (topiclen >> 8) & 0xff;
273 packet[4 + n ] = topiclen & 0xff;
274 memcpy(&packet[5 + n], topic, topiclen);
275 packet[5 + n + topiclen] = 0; /* QoS zero */
276
277 result = mqtt_send(conn, (char *)packet, packetlen);
278
279 fail:
280 free(topic);
281 free(packet);
282 return result;
283 }
284
285 /*
286 * Called when the first byte was already read.
287 */
mqtt_verify_suback(struct connectdata * conn)288 static CURLcode mqtt_verify_suback(struct connectdata *conn)
289 {
290 CURLcode result;
291 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
292 unsigned char readbuf[MQTT_SUBACK_LEN];
293 ssize_t nread;
294 struct mqtt_conn *mqtt = &conn->proto.mqtt;
295
296 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
297 if(result)
298 goto fail;
299
300 Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
301
302 /* fixme */
303 if(nread < MQTT_SUBACK_LEN) {
304 result = CURLE_WEIRD_SERVER_REPLY;
305 goto fail;
306 }
307
308 /* verify SUBACK */
309 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
310 readbuf[1] != (mqtt->packetid & 0xff) ||
311 readbuf[2] != 0x00)
312 result = CURLE_WEIRD_SERVER_REPLY;
313
314 fail:
315 return result;
316 }
317
mqtt_publish(struct connectdata * conn)318 static CURLcode mqtt_publish(struct connectdata *conn)
319 {
320 CURLcode result;
321 char *payload = conn->data->set.postfields;
322 size_t payloadlen = (size_t)conn->data->set.postfieldsize;
323 char *topic = NULL;
324 size_t topiclen;
325 unsigned char *pkt = NULL;
326 size_t i = 0;
327 size_t remaininglength;
328 size_t encodelen;
329 char encodedbytes[4];
330
331 result = mqtt_get_topic(conn, &topic, &topiclen);
332 if(result)
333 goto fail;
334
335 remaininglength = payloadlen + 2 + topiclen;
336 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
337
338 /* add the control byte and the encoded remaining length */
339 pkt = malloc(remaininglength + 1 + encodelen);
340 if(!pkt) {
341 result = CURLE_OUT_OF_MEMORY;
342 goto fail;
343 }
344
345 /* assemble packet */
346 pkt[i++] = MQTT_MSG_PUBLISH;
347 memcpy(&pkt[i], encodedbytes, encodelen);
348 i += encodelen;
349 pkt[i++] = (topiclen >> 8) & 0xff;
350 pkt[i++] = (topiclen & 0xff);
351 memcpy(&pkt[i], topic, topiclen);
352 i += topiclen;
353 memcpy(&pkt[i], payload, payloadlen);
354 i += payloadlen;
355 result = mqtt_send(conn, (char *)pkt, i);
356
357 fail:
358 free(pkt);
359 free(topic);
360 return result;
361 }
362
mqtt_decode_len(unsigned char * buf,size_t buflen,size_t * lenbytes)363 static size_t mqtt_decode_len(unsigned char *buf,
364 size_t buflen, size_t *lenbytes)
365 {
366 size_t len = 0;
367 size_t mult = 1;
368 size_t i;
369 unsigned char encoded = 128;
370
371 for(i = 0; (i < buflen) && (encoded & 128); i++) {
372 encoded = buf[i];
373 len += (encoded & 127) * mult;
374 mult *= 128;
375 }
376
377 if(lenbytes)
378 *lenbytes = i;
379
380 return len;
381 }
382
383 #ifdef CURLDEBUG
384 static const char *statenames[]={
385 "MQTT_FIRST",
386 "MQTT_REMAINING_LENGTH",
387 "MQTT_CONNACK",
388 "MQTT_SUBACK",
389 "MQTT_SUBACK_COMING",
390 "MQTT_PUBWAIT",
391 "MQTT_PUB_REMAIN",
392
393 "NOT A STATE"
394 };
395 #endif
396
397 /* The only way to change state */
mqstate(struct connectdata * conn,enum mqttstate state,enum mqttstate nextstate)398 static void mqstate(struct connectdata *conn,
399 enum mqttstate state,
400 enum mqttstate nextstate) /* used if state == FIRST */
401 {
402 struct mqtt_conn *mqtt = &conn->proto.mqtt;
403 #ifdef CURLDEBUG
404 infof(conn->data, "%s (from %s) (next is %s)\n",
405 statenames[state],
406 statenames[mqtt->state],
407 (state == MQTT_FIRST)? statenames[nextstate] : "");
408 #endif
409 mqtt->state = state;
410 if(state == MQTT_FIRST)
411 mqtt->nextstate = nextstate;
412 }
413
414
415 /* for the publish packet */
416 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
417
mqtt_read_publish(struct connectdata * conn,bool * done)418 static CURLcode mqtt_read_publish(struct connectdata *conn,
419 bool *done)
420 {
421 CURLcode result = CURLE_OK;
422 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
423 ssize_t nread;
424 struct Curl_easy *data = conn->data;
425 unsigned char *pkt = (unsigned char *)data->state.buffer;
426 size_t remlen;
427 struct mqtt_conn *mqtt = &conn->proto.mqtt;
428 struct MQTT *mq = data->req.p.mqtt;
429 unsigned char packet;
430
431 switch(mqtt->state) {
432 MQTT_SUBACK_COMING:
433 case MQTT_SUBACK_COMING:
434 result = mqtt_verify_suback(conn);
435 if(result)
436 break;
437
438 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
439 break;
440
441 case MQTT_SUBACK:
442 case MQTT_PUBWAIT:
443 /* we are expecting PUBLISH or SUBACK */
444 packet = mq->firstbyte & 0xf0;
445 if(packet == MQTT_MSG_PUBLISH)
446 mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
447 else if(packet == MQTT_MSG_SUBACK) {
448 mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
449 goto MQTT_SUBACK_COMING;
450 }
451 else if(packet == MQTT_MSG_DISCONNECT) {
452 infof(data, "Got DISCONNECT\n");
453 *done = TRUE;
454 goto end;
455 }
456 else {
457 result = CURLE_WEIRD_SERVER_REPLY;
458 goto end;
459 }
460
461 /* -- switched state -- */
462 remlen = mq->remaining_length;
463 infof(data, "Remaining length: %zd bytes\n", remlen);
464 Curl_pgrsSetDownloadSize(data, remlen);
465 data->req.bytecount = 0;
466 data->req.size = remlen;
467 mq->npacket = remlen; /* get this many bytes */
468 /* FALLTHROUGH */
469 case MQTT_PUB_REMAIN: {
470 /* read rest of packet, but no more. Cap to buffer size */
471 struct SingleRequest *k = &data->req;
472 size_t rest = mq->npacket;
473 if(rest > (size_t)data->set.buffer_size)
474 rest = (size_t)data->set.buffer_size;
475 result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
476 if(result) {
477 if(CURLE_AGAIN == result) {
478 infof(data, "EEEE AAAAGAIN\n");
479 }
480 goto end;
481 }
482 if(!nread) {
483 infof(data, "server disconnected\n");
484 result = CURLE_PARTIAL_FILE;
485 goto end;
486 }
487 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
488
489 mq->npacket -= nread;
490 k->bytecount += nread;
491 Curl_pgrsSetDownloadCounter(data, k->bytecount);
492
493 /* if QoS is set, message contains packet id */
494
495 result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
496 if(result)
497 goto end;
498
499 if(!mq->npacket)
500 /* no more PUBLISH payload, back to subscribe wait state */
501 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
502 break;
503 }
504 default:
505 DEBUGASSERT(NULL); /* illegal state */
506 result = CURLE_WEIRD_SERVER_REPLY;
507 goto end;
508 }
509 end:
510 return result;
511 }
512
mqtt_do(struct connectdata * conn,bool * done)513 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
514 {
515 CURLcode result = CURLE_OK;
516 struct Curl_easy *data = conn->data;
517
518 *done = FALSE; /* unconditionally */
519
520 result = mqtt_connect(conn);
521 if(result) {
522 failf(data, "Error %d sending MQTT CONN request", result);
523 return result;
524 }
525 mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
526 return CURLE_OK;
527 }
528
mqtt_doing(struct connectdata * conn,bool * done)529 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
530 {
531 CURLcode result = CURLE_OK;
532 struct mqtt_conn *mqtt = &conn->proto.mqtt;
533 struct Curl_easy *data = conn->data;
534 struct MQTT *mq = data->req.p.mqtt;
535 ssize_t nread;
536 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
537 unsigned char *pkt = (unsigned char *)data->state.buffer;
538 unsigned char byte;
539
540 *done = FALSE;
541
542 if(mq->nsend) {
543 /* send the remainder of an outgoing packet */
544 char *ptr = mq->sendleftovers;
545 result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
546 free(ptr);
547 if(result)
548 return result;
549 }
550
551 infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
552 switch(mqtt->state) {
553 case MQTT_FIRST:
554 /* Read the initial byte only */
555 result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
556 if(result)
557 break;
558 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
559 /* remember the first byte */
560 mq->npacket = 0;
561 mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
562 /* FALLTHROUGH */
563 case MQTT_REMAINING_LENGTH:
564 do {
565 result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
566 if(result)
567 break;
568 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
569 pkt[mq->npacket++] = byte;
570 } while((byte & 0x80) && (mq->npacket < 4));
571 if(result)
572 break;
573 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
574 mq->npacket = 0;
575 if(mq->remaining_length) {
576 mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
577 break;
578 }
579 mqstate(conn, MQTT_FIRST, MQTT_FIRST);
580
581 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
582 infof(data, "Got DISCONNECT\n");
583 *done = TRUE;
584 }
585 break;
586 case MQTT_CONNACK:
587 result = mqtt_verify_connack(conn);
588 if(result)
589 break;
590
591 if(conn->data->state.httpreq == HTTPREQ_POST) {
592 result = mqtt_publish(conn);
593 if(!result) {
594 result = mqtt_disconnect(conn);
595 *done = TRUE;
596 }
597 mqtt->nextstate = MQTT_FIRST;
598 }
599 else {
600 result = mqtt_subscribe(conn);
601 if(!result) {
602 mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
603 }
604 }
605 break;
606
607 case MQTT_SUBACK:
608 case MQTT_PUBWAIT:
609 case MQTT_PUB_REMAIN:
610 result = mqtt_read_publish(conn, done);
611 break;
612
613 default:
614 failf(conn->data, "State not handled yet");
615 *done = TRUE;
616 break;
617 }
618
619 if(result == CURLE_AGAIN)
620 result = CURLE_OK;
621 return result;
622 }
623
624 #endif /* CURL_DISABLE_MQTT */
625