1 /* purple
2 *
3 * Purple is the legal property of its developers, whose names are too numerous
4 * to list here. Please refer to the COPYRIGHT file distributed with this
5 * source distribution.
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
20 */
21
22 #include "internal.h"
23
24 #include <glib/gprintf.h>
25 #include <stdarg.h>
26 #include <string.h>
27
28 #include "account.h"
29 #include "eventloop.h"
30 #include "glibcompat.h"
31 #include "sslconn.h"
32
33 #include "marshal.h"
34 #include "mqtt.h"
35 #include "util.h"
36
37 struct _FbMqttPrivate
38 {
39 PurpleConnection *gc;
40 PurpleSslConnection *gsc;
41 gboolean connected;
42 guint16 mid;
43
44 GByteArray *rbuf;
45 GByteArray *wbuf;
46 gsize remz;
47
48 gint tev;
49 gint rev;
50 gint wev;
51 };
52
53 struct _FbMqttMessagePrivate
54 {
55 FbMqttMessageType type;
56 FbMqttMessageFlags flags;
57
58 GByteArray *bytes;
59 guint offset;
60 guint pos;
61
62 gboolean local;
63 };
64
65 G_DEFINE_TYPE_WITH_CODE(FbMqtt, fb_mqtt, G_TYPE_OBJECT, G_ADD_PRIVATE(FbMqtt));
66 G_DEFINE_TYPE_WITH_CODE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT, G_ADD_PRIVATE(FbMqttMessage));
67
68 static void
fb_mqtt_dispose(GObject * obj)69 fb_mqtt_dispose(GObject *obj)
70 {
71 FbMqtt *mqtt = FB_MQTT(obj);
72 FbMqttPrivate *priv = mqtt->priv;
73
74 fb_mqtt_close(mqtt);
75 g_byte_array_free(priv->rbuf, TRUE);
76 g_byte_array_free(priv->wbuf, TRUE);
77 }
78
79 static void
fb_mqtt_class_init(FbMqttClass * klass)80 fb_mqtt_class_init(FbMqttClass *klass)
81 {
82 GObjectClass *gklass = G_OBJECT_CLASS(klass);
83
84 gklass->dispose = fb_mqtt_dispose;
85 g_type_class_add_private(klass, sizeof (FbMqttPrivate));
86
87 /**
88 * FbMqtt::connect:
89 * @mqtt: The #FbMqtt.
90 *
91 * Emitted upon the successful completion of the connection
92 * process. This is emitted as a result of #fb_mqtt_connect().
93 */
94 g_signal_new("connect",
95 G_TYPE_FROM_CLASS(klass),
96 G_SIGNAL_ACTION,
97 0,
98 NULL, NULL,
99 fb_marshal_VOID__VOID,
100 G_TYPE_NONE,
101 0);
102
103 /**
104 * FbMqtt::error:
105 * @mqtt: The #FbMqtt.
106 * @error: The #GError.
107 *
108 * Emitted whenever an error is hit within the #FbMqtt. This
109 * should close the #FbMqtt with #fb_mqtt_close().
110 */
111 g_signal_new("error",
112 G_TYPE_FROM_CLASS(klass),
113 G_SIGNAL_ACTION,
114 0,
115 NULL, NULL,
116 fb_marshal_VOID__POINTER,
117 G_TYPE_NONE,
118 1, G_TYPE_POINTER);
119
120 /**
121 * FbMqtt::open:
122 * @mqtt: The #FbMqtt.
123 *
124 * Emitted upon the successful opening of the remote socket.
125 * This is emitted as a result of #fb_mqtt_open(). This should
126 * call #fb_mqtt_connect().
127 */
128 g_signal_new("open",
129 G_TYPE_FROM_CLASS(klass),
130 G_SIGNAL_ACTION,
131 0,
132 NULL, NULL,
133 fb_marshal_VOID__VOID,
134 G_TYPE_NONE,
135 0);
136
137 /**
138 * FbMqtt::publish:
139 * @mqtt: The #FbMqtt.
140 * @topic: The topic.
141 * @pload: The payload.
142 *
143 * Emitted upon an incoming message from the steam.
144 */
145 g_signal_new("publish",
146 G_TYPE_FROM_CLASS(klass),
147 G_SIGNAL_ACTION,
148 0,
149 NULL, NULL,
150 fb_marshal_VOID__STRING_BOXED,
151 G_TYPE_NONE,
152 2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
153 }
154
155 static void
fb_mqtt_init(FbMqtt * mqtt)156 fb_mqtt_init(FbMqtt *mqtt)
157 {
158 FbMqttPrivate *priv;
159
160 priv = G_TYPE_INSTANCE_GET_PRIVATE(mqtt, FB_TYPE_MQTT, FbMqttPrivate);
161 mqtt->priv = priv;
162
163 priv->rbuf = g_byte_array_new();
164 priv->wbuf = g_byte_array_new();
165 }
166
167 static void
fb_mqtt_message_dispose(GObject * obj)168 fb_mqtt_message_dispose(GObject *obj)
169 {
170 FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv;
171
172 if ((priv->bytes != NULL) && priv->local) {
173 g_byte_array_free(priv->bytes, TRUE);
174 }
175 }
176
177 static void
fb_mqtt_message_class_init(FbMqttMessageClass * klass)178 fb_mqtt_message_class_init(FbMqttMessageClass *klass)
179 {
180 GObjectClass *gklass = G_OBJECT_CLASS(klass);
181
182 gklass->dispose = fb_mqtt_message_dispose;
183 g_type_class_add_private(klass, sizeof (FbMqttMessagePrivate));
184 }
185
186 static void
fb_mqtt_message_init(FbMqttMessage * msg)187 fb_mqtt_message_init(FbMqttMessage *msg)
188 {
189 FbMqttMessagePrivate *priv;
190
191 priv = G_TYPE_INSTANCE_GET_PRIVATE(msg, FB_TYPE_MQTT_MESSAGE,
192 FbMqttMessagePrivate);
193 msg->priv = priv;
194 }
195
196 GQuark
fb_mqtt_error_quark(void)197 fb_mqtt_error_quark(void)
198 {
199 static GQuark q = 0;
200
201 if (G_UNLIKELY(q == 0)) {
202 q = g_quark_from_static_string("fb-mqtt-error-quark");
203 }
204
205 return q;
206 }
207
208 GQuark
fb_mqtt_ssl_error_quark(void)209 fb_mqtt_ssl_error_quark(void)
210 {
211 static GQuark q = 0;
212
213 if (G_UNLIKELY(q == 0)) {
214 q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
215 }
216
217 return q;
218 }
219
220 FbMqtt *
fb_mqtt_new(PurpleConnection * gc)221 fb_mqtt_new(PurpleConnection *gc)
222 {
223 FbMqtt *mqtt;
224 FbMqttPrivate *priv;
225
226 g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
227
228 mqtt = g_object_new(FB_TYPE_MQTT, NULL);
229 priv = mqtt->priv;
230 priv->gc = gc;
231
232 return mqtt;
233 };
234
235 void
fb_mqtt_close(FbMqtt * mqtt)236 fb_mqtt_close(FbMqtt *mqtt)
237 {
238 FbMqttPrivate *priv;
239
240 g_return_if_fail(FB_IS_MQTT(mqtt));
241 priv = mqtt->priv;
242
243 if (priv->wev > 0) {
244 purple_input_remove(priv->wev);
245 priv->wev = 0;
246 }
247
248 if (priv->rev > 0) {
249 purple_input_remove(priv->rev);
250 priv->rev = 0;
251 }
252
253 if (priv->tev > 0) {
254 purple_timeout_remove(priv->tev);
255 priv->tev = 0;
256 }
257
258 if (priv->gsc != NULL) {
259 purple_ssl_close(priv->gsc);
260 priv->gsc = NULL;
261 }
262
263 if (priv->wbuf->len > 0) {
264 fb_util_debug_warning("Closing with unwritten data");
265 }
266
267 priv->connected = FALSE;
268 g_byte_array_set_size(priv->rbuf, 0);
269 g_byte_array_set_size(priv->wbuf, 0);
270 }
271
272 void
fb_mqtt_error(FbMqtt * mqtt,FbMqttError error,const gchar * format,...)273 fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
274 {
275 GError *err;
276 va_list ap;
277
278 g_return_if_fail(FB_IS_MQTT(mqtt));
279
280 va_start(ap, format);
281 err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
282 va_end(ap);
283
284 g_signal_emit_by_name(mqtt, "error", err);
285 g_error_free(err);
286 }
287
288 static gboolean
fb_mqtt_cb_timeout(gpointer data)289 fb_mqtt_cb_timeout(gpointer data)
290 {
291 FbMqtt *mqtt = data;
292 FbMqttPrivate *priv = mqtt->priv;
293
294 priv->tev = 0;
295 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, _("Connection timed out"));
296 return FALSE;
297 }
298
299 static void
fb_mqtt_timeout_clear(FbMqtt * mqtt)300 fb_mqtt_timeout_clear(FbMqtt *mqtt)
301 {
302 FbMqttPrivate *priv = mqtt->priv;
303
304 if (priv->tev > 0) {
305 g_source_remove(priv->tev);
306 priv->tev = 0;
307 }
308 }
309
310 static void
fb_mqtt_timeout(FbMqtt * mqtt)311 fb_mqtt_timeout(FbMqtt *mqtt)
312 {
313 FbMqttPrivate *priv = mqtt->priv;
314
315 fb_mqtt_timeout_clear(mqtt);
316 priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_CONN,
317 fb_mqtt_cb_timeout, mqtt);
318 }
319
320 static gboolean
fb_mqtt_cb_ping(gpointer data)321 fb_mqtt_cb_ping(gpointer data)
322 {
323 FbMqtt *mqtt = data;
324 FbMqttMessage *msg;
325 FbMqttPrivate *priv = mqtt->priv;
326
327 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
328 fb_mqtt_write(mqtt, msg);
329 g_object_unref(msg);
330
331 priv->tev = 0;
332 fb_mqtt_timeout(mqtt);
333 return FALSE;
334 }
335
336 static void
fb_mqtt_ping(FbMqtt * mqtt)337 fb_mqtt_ping(FbMqtt *mqtt)
338 {
339 FbMqttPrivate *priv = mqtt->priv;
340
341 fb_mqtt_timeout_clear(mqtt);
342 priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_PING,
343 fb_mqtt_cb_ping, mqtt);
344 }
345
346 static void
fb_mqtt_cb_read(gpointer data,gint fd,PurpleInputCondition cond)347 fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
348 {
349 FbMqtt *mqtt = data;
350 FbMqttMessage *msg;
351 FbMqttPrivate *priv = mqtt->priv;
352 gint res;
353 guint mult;
354 guint8 buf[1024];
355 guint8 byte;
356 gsize size;
357 gssize rize;
358
359 if (priv->remz < 1) {
360 /* Reset the read buffer */
361 g_byte_array_set_size(priv->rbuf, 0);
362
363 res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
364
365 if (res < 0 && errno == EAGAIN) {
366 return;
367 } else if (res != 1) {
368 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
369 _("Failed to read fixed header"));
370 return;
371 }
372
373 g_byte_array_append(priv->rbuf, &byte, sizeof byte);
374
375 mult = 1;
376
377 do {
378 res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
379
380 /* TODO: this case isn't handled yet */
381 if (0 && res < 0 && errno == EAGAIN) {
382 return;
383 } else if (res != 1) {
384 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
385 _("Failed to read packet size"));
386 return;
387 }
388
389 g_byte_array_append(priv->rbuf, &byte, sizeof byte);
390
391 priv->remz += (byte & 127) * mult;
392 mult *= 128;
393 } while ((byte & 128) != 0);
394 }
395
396 if (priv->remz > 0) {
397 size = MIN(priv->remz, sizeof buf);
398 rize = purple_ssl_read(priv->gsc, buf, size);
399
400 if (rize < 0 && errno == EAGAIN) {
401 return;
402 } else if (rize < 1) {
403 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
404 _("Failed to read packet data"));
405 return;
406 }
407
408 g_byte_array_append(priv->rbuf, buf, rize);
409 priv->remz -= rize;
410 }
411
412 if (priv->remz < 1) {
413 msg = fb_mqtt_message_new_bytes(priv->rbuf);
414 priv->remz = 0;
415
416 if (G_UNLIKELY(msg == NULL)) {
417 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
418 _("Failed to parse message"));
419 return;
420 }
421
422 fb_mqtt_read(mqtt, msg);
423 g_object_unref(msg);
424 }
425 }
426
427 void
fb_mqtt_read(FbMqtt * mqtt,FbMqttMessage * msg)428 fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
429 {
430 FbMqttMessage *nsg;
431 FbMqttPrivate *priv;
432 FbMqttMessagePrivate *mriv;
433 GByteArray *wytes;
434 gchar *str;
435 guint8 chr;
436 guint16 mid;
437
438 g_return_if_fail(FB_IS_MQTT(mqtt));
439 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
440 priv = mqtt->priv;
441 mriv = msg->priv;
442
443 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
444 "Reading %d (flags: 0x%0X)",
445 mriv->type, mriv->flags);
446
447 switch (mriv->type) {
448 case FB_MQTT_MESSAGE_TYPE_CONNACK:
449 if (!fb_mqtt_message_read_byte(msg, NULL) ||
450 !fb_mqtt_message_read_byte(msg, &chr))
451 {
452 break;
453 }
454
455 if (chr != FB_MQTT_ERROR_SUCCESS) {
456 fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
457 chr);
458 return;
459 }
460
461 priv->connected = TRUE;
462 fb_mqtt_ping(mqtt);
463 g_signal_emit_by_name(mqtt, "connect");
464 return;
465
466 case FB_MQTT_MESSAGE_TYPE_PUBLISH:
467 if (!fb_mqtt_message_read_str(msg, &str)) {
468 break;
469 }
470
471 if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
472 (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
473 {
474 if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
475 chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
476 } else {
477 chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
478 }
479
480 if (!fb_mqtt_message_read_mid(msg, &mid)) {
481 g_free(str);
482 break;
483 }
484
485 nsg = fb_mqtt_message_new(chr, 0);
486 fb_mqtt_message_write_u16(nsg, mid);
487 fb_mqtt_write(mqtt, nsg);
488 g_object_unref(nsg);
489 }
490
491 wytes = g_byte_array_new();
492 fb_mqtt_message_read_r(msg, wytes);
493 g_signal_emit_by_name(mqtt, "publish", str, wytes);
494 g_byte_array_free(wytes, TRUE);
495 g_free(str);
496 return;
497
498 case FB_MQTT_MESSAGE_TYPE_PUBREL:
499 if (!fb_mqtt_message_read_mid(msg, &mid)) {
500 break;
501 }
502
503 nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
504 fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
505 fb_mqtt_write(mqtt, nsg);
506 g_object_unref(nsg);
507 return;
508
509 case FB_MQTT_MESSAGE_TYPE_PINGRESP:
510 fb_mqtt_ping(mqtt);
511 return;
512
513 case FB_MQTT_MESSAGE_TYPE_PUBACK:
514 case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
515 case FB_MQTT_MESSAGE_TYPE_SUBACK:
516 case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
517 return;
518
519 default:
520 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
521 _("Unknown packet (%u)"), mriv->type);
522 return;
523 }
524
525 /* Since no case returned, there was a parse error. */
526 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
527 _("Failed to parse message"));
528 }
529
530 static void
fb_mqtt_cb_write(gpointer data,gint fd,PurpleInputCondition cond)531 fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond)
532 {
533 FbMqtt *mqtt = data;
534 FbMqttPrivate *priv = mqtt->priv;
535 gssize wize;
536
537 wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len);
538
539 if (wize < 0) {
540 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
541 _("Failed to write data"));
542 return;
543 }
544
545 if (wize > 0) {
546 g_byte_array_remove_range(priv->wbuf, 0, wize);
547 }
548
549 if (priv->wbuf->len < 1) {
550 priv->wev = 0;
551 }
552 }
553
554 void
fb_mqtt_write(FbMqtt * mqtt,FbMqttMessage * msg)555 fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
556 {
557 const GByteArray *bytes;
558 FbMqttMessagePrivate *mriv;
559 FbMqttPrivate *priv;
560
561 g_return_if_fail(FB_IS_MQTT(mqtt));
562 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
563 priv = mqtt->priv;
564 mriv = msg->priv;
565
566 bytes = fb_mqtt_message_bytes(msg);
567
568 if (G_UNLIKELY(bytes == NULL)) {
569 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
570 _("Failed to format data"));
571 return;
572 }
573
574 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
575 "Writing %d (flags: 0x%0X)",
576 mriv->type, mriv->flags);
577
578 g_byte_array_append(priv->wbuf, bytes->data, bytes->len);
579 fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE);
580
581 if (priv->wev > 0) {
582 priv->wev = purple_input_add(priv->gsc->fd,
583 PURPLE_INPUT_WRITE,
584 fb_mqtt_cb_write, mqtt);
585 }
586 }
587
588 static void
fb_mqtt_cb_open(gpointer data,PurpleSslConnection * ssl,PurpleInputCondition cond)589 fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl,
590 PurpleInputCondition cond)
591 {
592 FbMqtt *mqtt = data;
593 FbMqttPrivate *priv = mqtt->priv;
594
595 fb_mqtt_timeout_clear(mqtt);
596 priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ,
597 fb_mqtt_cb_read, mqtt);
598 g_signal_emit_by_name(mqtt, "open");
599 }
600
601 static void
fb_mqtt_cb_open_error(PurpleSslConnection * ssl,PurpleSslErrorType error,gpointer data)602 fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error,
603 gpointer data)
604 {
605 const gchar *str;
606 FbMqtt *mqtt = data;
607 FbMqttPrivate *priv = mqtt->priv;
608 GError *err;
609
610 str = purple_ssl_strerror(error);
611 err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str);
612
613 /* Do not call purple_ssl_close() from the error_func */
614 priv->gsc = NULL;
615
616 g_signal_emit_by_name(mqtt, "error", err);
617 g_error_free(err);
618 }
619
620 void
fb_mqtt_open(FbMqtt * mqtt,const gchar * host,gint port)621 fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
622 {
623 FbMqttPrivate *priv;
624 PurpleAccount *acc;
625
626 g_return_if_fail(FB_IS_MQTT(mqtt));
627 priv = mqtt->priv;
628
629 acc = purple_connection_get_account(priv->gc);
630 fb_mqtt_close(mqtt);
631 priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open,
632 fb_mqtt_cb_open_error, mqtt);
633
634 if (priv->gsc == NULL) {
635 fb_mqtt_cb_open_error(NULL, 0, mqtt);
636 return;
637 }
638
639 fb_mqtt_timeout(mqtt);
640 }
641
642 void
fb_mqtt_connect(FbMqtt * mqtt,guint8 flags,const GByteArray * pload)643 fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
644 {
645 FbMqttMessage *msg;
646
647 g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
648 g_return_if_fail(pload != NULL);
649
650 /* Facebook always sends a CONNACK, use QoS1 */
651 flags |= FB_MQTT_CONNECT_FLAG_QOS1;
652
653 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
654 fb_mqtt_message_write_str(msg, FB_MQTT_NAME); /* Protocol name */
655 fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
656 fb_mqtt_message_write_byte(msg, flags); /* Flags */
657 fb_mqtt_message_write_u16(msg, FB_MQTT_KA); /* Keep alive */
658
659 fb_mqtt_message_write(msg, pload->data, pload->len);
660 fb_mqtt_write(mqtt, msg);
661
662 fb_mqtt_timeout(mqtt);
663 g_object_unref(msg);
664 }
665
666 gboolean
fb_mqtt_connected(FbMqtt * mqtt,gboolean error)667 fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
668 {
669 FbMqttPrivate *priv;
670 gboolean connected;
671
672 g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
673 priv = mqtt->priv;
674 connected = (priv->gsc != NULL) && priv->connected;
675
676 if (!connected && error) {
677 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
678 _("Not connected"));
679 }
680
681 return connected;
682 }
683
684 void
fb_mqtt_disconnect(FbMqtt * mqtt)685 fb_mqtt_disconnect(FbMqtt *mqtt)
686 {
687 FbMqttMessage *msg;
688
689 if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
690 return;
691 }
692
693 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
694 fb_mqtt_write(mqtt, msg);
695 g_object_unref(msg);
696 fb_mqtt_close(mqtt);
697 }
698
699 void
fb_mqtt_publish(FbMqtt * mqtt,const gchar * topic,const GByteArray * pload)700 fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
701 {
702 FbMqttMessage *msg;
703 FbMqttPrivate *priv;
704
705 g_return_if_fail(FB_IS_MQTT(mqtt));
706 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
707 priv = mqtt->priv;
708
709 /* Message identifier not required, but for consistency use QoS1 */
710 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
711 FB_MQTT_MESSAGE_FLAG_QOS1);
712
713 fb_mqtt_message_write_str(msg, topic); /* Message topic */
714 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
715
716 if (pload != NULL) {
717 fb_mqtt_message_write(msg, pload->data, pload->len);
718 }
719
720 fb_mqtt_write(mqtt, msg);
721 g_object_unref(msg);
722 }
723
724 void
fb_mqtt_subscribe(FbMqtt * mqtt,const gchar * topic1,guint16 qos1,...)725 fb_mqtt_subscribe(FbMqtt *mqtt, const gchar *topic1, guint16 qos1, ...)
726 {
727 const gchar *topic;
728 FbMqttMessage *msg;
729 FbMqttPrivate *priv;
730 guint16 qos;
731 va_list ap;
732
733 g_return_if_fail(FB_IS_MQTT(mqtt));
734 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
735 priv = mqtt->priv;
736
737 /* Facebook requires a message identifier, use QoS1 */
738 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
739 FB_MQTT_MESSAGE_FLAG_QOS1);
740
741 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
742 fb_mqtt_message_write_str(msg, topic1); /* First topics */
743 fb_mqtt_message_write_byte(msg, qos1); /* First QoS value */
744
745 va_start(ap, qos1);
746
747 while ((topic = va_arg(ap, const gchar*)) != NULL) {
748 qos = va_arg(ap, guint);
749 fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
750 fb_mqtt_message_write_byte(msg, qos); /* Remaining QoS values */
751 }
752
753 va_end(ap);
754
755 fb_mqtt_write(mqtt, msg);
756 g_object_unref(msg);
757 }
758
759 void
fb_mqtt_unsubscribe(FbMqtt * mqtt,const gchar * topic1,...)760 fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
761 {
762 const gchar *topic;
763 FbMqttMessage *msg;
764 FbMqttPrivate *priv;
765 va_list ap;
766
767 g_return_if_fail(FB_IS_MQTT(mqtt));
768 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
769 priv = mqtt->priv;
770
771 /* Facebook requires a message identifier, use QoS1 */
772 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
773 FB_MQTT_MESSAGE_FLAG_QOS1);
774
775 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
776 fb_mqtt_message_write_str(msg, topic1); /* First topic */
777
778 va_start(ap, topic1);
779
780 while ((topic = va_arg(ap, const gchar*)) != NULL) {
781 fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
782 }
783
784 va_end(ap);
785
786 fb_mqtt_write(mqtt, msg);
787 g_object_unref(msg);
788 }
789
790 FbMqttMessage *
fb_mqtt_message_new(FbMqttMessageType type,FbMqttMessageFlags flags)791 fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
792 {
793 FbMqttMessage *msg;
794 FbMqttMessagePrivate *priv;
795
796 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
797 priv = msg->priv;
798
799 priv->type = type;
800 priv->flags = flags;
801 priv->bytes = g_byte_array_new();
802 priv->local = TRUE;
803
804 return msg;
805 }
806
807 FbMqttMessage *
fb_mqtt_message_new_bytes(GByteArray * bytes)808 fb_mqtt_message_new_bytes(GByteArray *bytes)
809 {
810 FbMqttMessage *msg;
811 FbMqttMessagePrivate *priv;
812 guint8 *byte;
813
814 g_return_val_if_fail(bytes != NULL, NULL);
815 g_return_val_if_fail(bytes->len >= 2, NULL);
816
817 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
818 priv = msg->priv;
819
820 priv->bytes = bytes;
821 priv->local = FALSE;
822 priv->type = (*bytes->data & 0xF0) >> 4;
823 priv->flags = *bytes->data & 0x0F;
824
825 /* Skip the fixed header */
826 for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; );
827 priv->offset = byte - bytes->data;
828 priv->pos = priv->offset;
829
830 return msg;
831 }
832
833 void
fb_mqtt_message_reset(FbMqttMessage * msg)834 fb_mqtt_message_reset(FbMqttMessage *msg)
835 {
836 FbMqttMessagePrivate *priv;
837
838 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
839 priv = msg->priv;
840
841 if (priv->offset > 0) {
842 g_byte_array_remove_range(priv->bytes, 0, priv->offset);
843 priv->offset = 0;
844 priv->pos = 0;
845 }
846 }
847
848 const GByteArray *
fb_mqtt_message_bytes(FbMqttMessage * msg)849 fb_mqtt_message_bytes(FbMqttMessage *msg)
850 {
851 FbMqttMessagePrivate *priv;
852 guint i;
853 guint8 byte;
854 guint8 sbuf[4];
855 guint32 size;
856
857 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
858 priv = msg->priv;
859
860 i = 0;
861 size = priv->bytes->len - priv->offset;
862
863 do {
864 if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
865 return NULL;
866 }
867
868 byte = size % 128;
869 size /= 128;
870
871 if (size > 0) {
872 byte |= 128;
873 }
874
875 sbuf[i++] = byte;
876 } while (size > 0);
877
878 fb_mqtt_message_reset(msg);
879 g_byte_array_prepend(priv->bytes, sbuf, i);
880
881 byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F);
882 g_byte_array_prepend(priv->bytes, &byte, sizeof byte);
883
884 priv->pos = (i + 1) * (sizeof byte);
885 return priv->bytes;
886 }
887
888 gboolean
fb_mqtt_message_read(FbMqttMessage * msg,gpointer data,guint size)889 fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
890 {
891 FbMqttMessagePrivate *priv;
892
893 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
894 priv = msg->priv;
895
896 if ((priv->pos + size) > priv->bytes->len) {
897 return FALSE;
898 }
899
900 if ((data != NULL) && (size > 0)) {
901 memcpy(data, priv->bytes->data + priv->pos, size);
902 }
903
904 priv->pos += size;
905 return TRUE;
906 }
907
908 gboolean
fb_mqtt_message_read_r(FbMqttMessage * msg,GByteArray * bytes)909 fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
910 {
911 FbMqttMessagePrivate *priv;
912 guint size;
913
914 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
915 priv = msg->priv;
916 size = priv->bytes->len - priv->pos;
917
918 if (G_LIKELY(size > 0)) {
919 g_byte_array_append(bytes, priv->bytes->data + priv->pos,
920 size);
921 }
922
923 return TRUE;
924 }
925
926 gboolean
fb_mqtt_message_read_byte(FbMqttMessage * msg,guint8 * value)927 fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
928 {
929 return fb_mqtt_message_read(msg, value, sizeof *value);
930 }
931
932 gboolean
fb_mqtt_message_read_mid(FbMqttMessage * msg,guint16 * value)933 fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
934 {
935 return fb_mqtt_message_read_u16(msg, value);
936 }
937
938 gboolean
fb_mqtt_message_read_u16(FbMqttMessage * msg,guint16 * value)939 fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
940 {
941 if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
942 return FALSE;
943 }
944
945 if (value != NULL) {
946 *value = g_ntohs(*value);
947 }
948
949 return TRUE;
950 }
951
952 gboolean
fb_mqtt_message_read_str(FbMqttMessage * msg,gchar ** value)953 fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
954 {
955 guint8 *data;
956 guint16 size;
957
958 if (!fb_mqtt_message_read_u16(msg, &size)) {
959 return FALSE;
960 }
961
962 if (value != NULL) {
963 data = g_new(guint8, size + 1);
964 data[size] = 0;
965 } else {
966 data = NULL;
967 }
968
969 if (!fb_mqtt_message_read(msg, data, size)) {
970 g_free(data);
971 return FALSE;
972 }
973
974 if (value != NULL) {
975 *value = (gchar *) data;
976 }
977
978 return TRUE;
979 }
980
981 void
fb_mqtt_message_write(FbMqttMessage * msg,gconstpointer data,guint size)982 fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
983 {
984 FbMqttMessagePrivate *priv;
985
986 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
987 priv = msg->priv;
988
989 g_byte_array_append(priv->bytes, data, size);
990 priv->pos += size;
991 }
992
993 void
fb_mqtt_message_write_byte(FbMqttMessage * msg,guint8 value)994 fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
995 {
996 fb_mqtt_message_write(msg, &value, sizeof value);
997 }
998
999 void
fb_mqtt_message_write_mid(FbMqttMessage * msg,guint16 * value)1000 fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
1001 {
1002 g_return_if_fail(value != NULL);
1003 fb_mqtt_message_write_u16(msg, ++(*value));
1004 }
1005
1006 void
fb_mqtt_message_write_u16(FbMqttMessage * msg,guint16 value)1007 fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
1008 {
1009 value = g_htons(value);
1010 fb_mqtt_message_write(msg, &value, sizeof value);
1011 }
1012
1013 void
fb_mqtt_message_write_str(FbMqttMessage * msg,const gchar * value)1014 fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
1015 {
1016 gint16 size;
1017
1018 g_return_if_fail(value != NULL);
1019
1020 size = strlen(value);
1021 fb_mqtt_message_write_u16(msg, size);
1022 fb_mqtt_message_write(msg, value, size);
1023 }
1024