1 /* purple
2  *
3  * Purple is the legal property of its developers, whose names are too numerous
4  * to list here.  Please refer to the COPYRIGHT file distributed with this
5  * source distribution.
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
20  */
21 
22 #include "internal.h"
23 
24 #include <glib/gprintf.h>
25 #include <stdarg.h>
26 #include <string.h>
27 
28 #include "account.h"
29 #include "eventloop.h"
30 #include "glibcompat.h"
31 #include "sslconn.h"
32 
33 #include "marshal.h"
34 #include "mqtt.h"
35 #include "util.h"
36 
37 struct _FbMqttPrivate
38 {
39 	PurpleConnection *gc;
40 	PurpleSslConnection *gsc;
41 	gboolean connected;
42 	guint16 mid;
43 
44 	GByteArray *rbuf;
45 	GByteArray *wbuf;
46 	gsize remz;
47 
48 	gint tev;
49 	gint rev;
50 	gint wev;
51 };
52 
53 struct _FbMqttMessagePrivate
54 {
55 	FbMqttMessageType type;
56 	FbMqttMessageFlags flags;
57 
58 	GByteArray *bytes;
59 	guint offset;
60 	guint pos;
61 
62 	gboolean local;
63 };
64 
65 G_DEFINE_TYPE_WITH_CODE(FbMqtt, fb_mqtt, G_TYPE_OBJECT, G_ADD_PRIVATE(FbMqtt));
66 G_DEFINE_TYPE_WITH_CODE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT, G_ADD_PRIVATE(FbMqttMessage));
67 
68 static void
fb_mqtt_dispose(GObject * obj)69 fb_mqtt_dispose(GObject *obj)
70 {
71 	FbMqtt *mqtt = FB_MQTT(obj);
72 	FbMqttPrivate *priv = mqtt->priv;
73 
74 	fb_mqtt_close(mqtt);
75 	g_byte_array_free(priv->rbuf, TRUE);
76 	g_byte_array_free(priv->wbuf, TRUE);
77 }
78 
79 static void
fb_mqtt_class_init(FbMqttClass * klass)80 fb_mqtt_class_init(FbMqttClass *klass)
81 {
82 	GObjectClass *gklass = G_OBJECT_CLASS(klass);
83 
84 	gklass->dispose = fb_mqtt_dispose;
85 	g_type_class_add_private(klass, sizeof (FbMqttPrivate));
86 
87 	/**
88 	 * FbMqtt::connect:
89 	 * @mqtt: The #FbMqtt.
90 	 *
91 	 * Emitted upon the successful completion of the connection
92 	 * process. This is emitted as a result of #fb_mqtt_connect().
93 	 */
94 	g_signal_new("connect",
95 	             G_TYPE_FROM_CLASS(klass),
96 	             G_SIGNAL_ACTION,
97 	             0,
98 	             NULL, NULL,
99 	             fb_marshal_VOID__VOID,
100 	             G_TYPE_NONE,
101 	             0);
102 
103 	/**
104 	 * FbMqtt::error:
105 	 * @mqtt: The #FbMqtt.
106 	 * @error: The #GError.
107 	 *
108 	 * Emitted whenever an error is hit within the #FbMqtt. This
109 	 * should close the #FbMqtt with #fb_mqtt_close().
110 	 */
111 	g_signal_new("error",
112 	             G_TYPE_FROM_CLASS(klass),
113 	             G_SIGNAL_ACTION,
114 	             0,
115 	             NULL, NULL,
116 	             fb_marshal_VOID__POINTER,
117 	             G_TYPE_NONE,
118 	             1, G_TYPE_POINTER);
119 
120 	/**
121 	 * FbMqtt::open:
122 	 * @mqtt: The #FbMqtt.
123 	 *
124 	 * Emitted upon the successful opening of the remote socket.
125 	 * This is emitted as a result of #fb_mqtt_open(). This should
126 	 * call #fb_mqtt_connect().
127 	 */
128 	g_signal_new("open",
129 	             G_TYPE_FROM_CLASS(klass),
130 	             G_SIGNAL_ACTION,
131 	             0,
132 	             NULL, NULL,
133 	             fb_marshal_VOID__VOID,
134 	             G_TYPE_NONE,
135 	             0);
136 
137 	/**
138 	 * FbMqtt::publish:
139 	 * @mqtt: The #FbMqtt.
140 	 * @topic: The topic.
141 	 * @pload: The payload.
142 	 *
143 	 * Emitted upon an incoming message from the steam.
144 	 */
145 	g_signal_new("publish",
146 	             G_TYPE_FROM_CLASS(klass),
147 	             G_SIGNAL_ACTION,
148 	             0,
149 	             NULL, NULL,
150 	             fb_marshal_VOID__STRING_BOXED,
151 	             G_TYPE_NONE,
152 	             2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
153 }
154 
155 static void
fb_mqtt_init(FbMqtt * mqtt)156 fb_mqtt_init(FbMqtt *mqtt)
157 {
158 	FbMqttPrivate *priv;
159 
160 	priv = G_TYPE_INSTANCE_GET_PRIVATE(mqtt, FB_TYPE_MQTT, FbMqttPrivate);
161 	mqtt->priv = priv;
162 
163 	priv->rbuf = g_byte_array_new();
164 	priv->wbuf = g_byte_array_new();
165 }
166 
167 static void
fb_mqtt_message_dispose(GObject * obj)168 fb_mqtt_message_dispose(GObject *obj)
169 {
170 	FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv;
171 
172 	if ((priv->bytes != NULL) && priv->local) {
173 		g_byte_array_free(priv->bytes, TRUE);
174 	}
175 }
176 
177 static void
fb_mqtt_message_class_init(FbMqttMessageClass * klass)178 fb_mqtt_message_class_init(FbMqttMessageClass *klass)
179 {
180 	GObjectClass *gklass = G_OBJECT_CLASS(klass);
181 
182 	gklass->dispose = fb_mqtt_message_dispose;
183 	g_type_class_add_private(klass, sizeof (FbMqttMessagePrivate));
184 }
185 
186 static void
fb_mqtt_message_init(FbMqttMessage * msg)187 fb_mqtt_message_init(FbMqttMessage *msg)
188 {
189 	FbMqttMessagePrivate *priv;
190 
191 	priv = G_TYPE_INSTANCE_GET_PRIVATE(msg, FB_TYPE_MQTT_MESSAGE,
192 	                                   FbMqttMessagePrivate);
193 	msg->priv = priv;
194 }
195 
196 GQuark
fb_mqtt_error_quark(void)197 fb_mqtt_error_quark(void)
198 {
199 	static GQuark q = 0;
200 
201 	if (G_UNLIKELY(q == 0)) {
202 		q = g_quark_from_static_string("fb-mqtt-error-quark");
203 	}
204 
205 	return q;
206 }
207 
208 GQuark
fb_mqtt_ssl_error_quark(void)209 fb_mqtt_ssl_error_quark(void)
210 {
211 	static GQuark q = 0;
212 
213 	if (G_UNLIKELY(q == 0)) {
214 		q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
215 	}
216 
217 	return q;
218 }
219 
220 FbMqtt *
fb_mqtt_new(PurpleConnection * gc)221 fb_mqtt_new(PurpleConnection *gc)
222 {
223 	FbMqtt *mqtt;
224 	FbMqttPrivate *priv;
225 
226 	g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
227 
228 	mqtt = g_object_new(FB_TYPE_MQTT,  NULL);
229 	priv = mqtt->priv;
230 	priv->gc = gc;
231 
232 	return mqtt;
233 };
234 
235 void
fb_mqtt_close(FbMqtt * mqtt)236 fb_mqtt_close(FbMqtt *mqtt)
237 {
238 	FbMqttPrivate *priv;
239 
240 	g_return_if_fail(FB_IS_MQTT(mqtt));
241 	priv = mqtt->priv;
242 
243 	if (priv->wev > 0) {
244 		purple_input_remove(priv->wev);
245 		priv->wev = 0;
246 	}
247 
248 	if (priv->rev > 0) {
249 		purple_input_remove(priv->rev);
250 		priv->rev = 0;
251 	}
252 
253 	if (priv->tev > 0) {
254 		purple_timeout_remove(priv->tev);
255 		priv->tev = 0;
256 	}
257 
258 	if (priv->gsc != NULL) {
259 		purple_ssl_close(priv->gsc);
260 		priv->gsc = NULL;
261 	}
262 
263 	if (priv->wbuf->len > 0) {
264 		fb_util_debug_warning("Closing with unwritten data");
265 	}
266 
267 	priv->connected = FALSE;
268 	g_byte_array_set_size(priv->rbuf, 0);
269 	g_byte_array_set_size(priv->wbuf, 0);
270 }
271 
272 void
fb_mqtt_error(FbMqtt * mqtt,FbMqttError error,const gchar * format,...)273 fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
274 {
275 	GError *err;
276 	va_list ap;
277 
278 	g_return_if_fail(FB_IS_MQTT(mqtt));
279 
280 	va_start(ap, format);
281 	err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
282 	va_end(ap);
283 
284 	g_signal_emit_by_name(mqtt, "error", err);
285 	g_error_free(err);
286 }
287 
288 static gboolean
fb_mqtt_cb_timeout(gpointer data)289 fb_mqtt_cb_timeout(gpointer data)
290 {
291 	FbMqtt *mqtt = data;
292 	FbMqttPrivate *priv = mqtt->priv;
293 
294 	priv->tev = 0;
295 	fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, _("Connection timed out"));
296 	return FALSE;
297 }
298 
299 static void
fb_mqtt_timeout_clear(FbMqtt * mqtt)300 fb_mqtt_timeout_clear(FbMqtt *mqtt)
301 {
302 	FbMqttPrivate *priv = mqtt->priv;
303 
304 	if (priv->tev > 0) {
305 		g_source_remove(priv->tev);
306 		priv->tev = 0;
307 	}
308 }
309 
310 static void
fb_mqtt_timeout(FbMqtt * mqtt)311 fb_mqtt_timeout(FbMqtt *mqtt)
312 {
313 	FbMqttPrivate *priv = mqtt->priv;
314 
315 	fb_mqtt_timeout_clear(mqtt);
316 	priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_CONN,
317 	                               fb_mqtt_cb_timeout, mqtt);
318 }
319 
320 static gboolean
fb_mqtt_cb_ping(gpointer data)321 fb_mqtt_cb_ping(gpointer data)
322 {
323 	FbMqtt *mqtt = data;
324 	FbMqttMessage *msg;
325 	FbMqttPrivate *priv = mqtt->priv;
326 
327 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
328 	fb_mqtt_write(mqtt, msg);
329 	g_object_unref(msg);
330 
331 	priv->tev = 0;
332 	fb_mqtt_timeout(mqtt);
333 	return FALSE;
334 }
335 
336 static void
fb_mqtt_ping(FbMqtt * mqtt)337 fb_mqtt_ping(FbMqtt *mqtt)
338 {
339 	FbMqttPrivate *priv = mqtt->priv;
340 
341 	fb_mqtt_timeout_clear(mqtt);
342 	priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_PING,
343 	                               fb_mqtt_cb_ping, mqtt);
344 }
345 
346 static void
fb_mqtt_cb_read(gpointer data,gint fd,PurpleInputCondition cond)347 fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
348 {
349 	FbMqtt *mqtt = data;
350 	FbMqttMessage *msg;
351 	FbMqttPrivate *priv = mqtt->priv;
352 	gint res;
353 	guint mult;
354 	guint8 buf[1024];
355 	guint8 byte;
356 	gsize size;
357 	gssize rize;
358 
359 	if (priv->remz < 1) {
360 		/* Reset the read buffer */
361 		g_byte_array_set_size(priv->rbuf, 0);
362 
363 		res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
364 
365 		if (res < 0 && errno == EAGAIN) {
366 			return;
367 		} else if (res != 1) {
368 			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
369 			              _("Failed to read fixed header"));
370 			return;
371 		}
372 
373 		g_byte_array_append(priv->rbuf, &byte, sizeof byte);
374 
375 		mult = 1;
376 
377 		do {
378 			res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
379 
380 			/* TODO: this case isn't handled yet */
381 			if (0 && res < 0 && errno == EAGAIN) {
382 				return;
383 			} else if (res != 1) {
384 				fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
385 				              _("Failed to read packet size"));
386 				return;
387 			}
388 
389 			g_byte_array_append(priv->rbuf, &byte, sizeof byte);
390 
391 			priv->remz += (byte & 127) * mult;
392 			mult *= 128;
393 		} while ((byte & 128) != 0);
394 	}
395 
396 	if (priv->remz > 0) {
397 		size = MIN(priv->remz, sizeof buf);
398 		rize = purple_ssl_read(priv->gsc, buf, size);
399 
400 		if (rize < 0 && errno == EAGAIN) {
401 			return;
402 		} else if (rize < 1) {
403 			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
404 			              _("Failed to read packet data"));
405 			return;
406 		}
407 
408 		g_byte_array_append(priv->rbuf, buf, rize);
409 		priv->remz -= rize;
410 	}
411 
412 	if (priv->remz < 1) {
413 		msg = fb_mqtt_message_new_bytes(priv->rbuf);
414 		priv->remz = 0;
415 
416 		if (G_UNLIKELY(msg == NULL)) {
417 			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
418 			              _("Failed to parse message"));
419 			return;
420 		}
421 
422 		fb_mqtt_read(mqtt, msg);
423 		g_object_unref(msg);
424 	}
425 }
426 
427 void
fb_mqtt_read(FbMqtt * mqtt,FbMqttMessage * msg)428 fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
429 {
430 	FbMqttMessage *nsg;
431 	FbMqttPrivate *priv;
432 	FbMqttMessagePrivate *mriv;
433 	GByteArray *wytes;
434 	gchar *str;
435 	guint8 chr;
436 	guint16 mid;
437 
438 	g_return_if_fail(FB_IS_MQTT(mqtt));
439 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
440 	priv = mqtt->priv;
441 	mriv = msg->priv;
442 
443 	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
444 	                      "Reading %d (flags: 0x%0X)",
445 			      mriv->type, mriv->flags);
446 
447 	switch (mriv->type) {
448 	case FB_MQTT_MESSAGE_TYPE_CONNACK:
449 		if (!fb_mqtt_message_read_byte(msg, NULL) ||
450 		    !fb_mqtt_message_read_byte(msg, &chr))
451 		{
452 			break;
453 		}
454 
455 		if (chr != FB_MQTT_ERROR_SUCCESS) {
456 			fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
457 			              chr);
458 			return;
459 		}
460 
461 		priv->connected = TRUE;
462 		fb_mqtt_ping(mqtt);
463 		g_signal_emit_by_name(mqtt, "connect");
464 		return;
465 
466 	case FB_MQTT_MESSAGE_TYPE_PUBLISH:
467 		if (!fb_mqtt_message_read_str(msg, &str)) {
468 			break;
469 		}
470 
471 		if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
472 		    (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
473 		{
474 			if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
475 				chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
476 			} else {
477 				chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
478 			}
479 
480 			if (!fb_mqtt_message_read_mid(msg, &mid)) {
481 				g_free(str);
482 				break;
483 			}
484 
485 			nsg = fb_mqtt_message_new(chr, 0);
486 			fb_mqtt_message_write_u16(nsg, mid);
487 			fb_mqtt_write(mqtt, nsg);
488 			g_object_unref(nsg);
489 		}
490 
491 		wytes = g_byte_array_new();
492 		fb_mqtt_message_read_r(msg, wytes);
493 		g_signal_emit_by_name(mqtt, "publish", str, wytes);
494 		g_byte_array_free(wytes, TRUE);
495 		g_free(str);
496 		return;
497 
498 	case FB_MQTT_MESSAGE_TYPE_PUBREL:
499 		if (!fb_mqtt_message_read_mid(msg, &mid)) {
500 			break;
501 		}
502 
503 		nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
504 		fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
505 		fb_mqtt_write(mqtt, nsg);
506 		g_object_unref(nsg);
507 		return;
508 
509 	case FB_MQTT_MESSAGE_TYPE_PINGRESP:
510 		fb_mqtt_ping(mqtt);
511 		return;
512 
513 	case FB_MQTT_MESSAGE_TYPE_PUBACK:
514 	case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
515 	case FB_MQTT_MESSAGE_TYPE_SUBACK:
516 	case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
517 		return;
518 
519 	default:
520 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
521 		              _("Unknown packet (%u)"), mriv->type);
522 		return;
523 	}
524 
525 	/* Since no case returned, there was a parse error. */
526 	fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
527 	              _("Failed to parse message"));
528 }
529 
530 static void
fb_mqtt_cb_write(gpointer data,gint fd,PurpleInputCondition cond)531 fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond)
532 {
533 	FbMqtt *mqtt = data;
534 	FbMqttPrivate *priv = mqtt->priv;
535 	gssize wize;
536 
537 	wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len);
538 
539 	if (wize < 0) {
540 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
541 		              _("Failed to write data"));
542 		return;
543 	}
544 
545 	if (wize > 0) {
546 		g_byte_array_remove_range(priv->wbuf, 0, wize);
547 	}
548 
549 	if (priv->wbuf->len < 1) {
550 		priv->wev = 0;
551 	}
552 }
553 
554 void
fb_mqtt_write(FbMqtt * mqtt,FbMqttMessage * msg)555 fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
556 {
557 	const GByteArray *bytes;
558 	FbMqttMessagePrivate *mriv;
559 	FbMqttPrivate *priv;
560 
561 	g_return_if_fail(FB_IS_MQTT(mqtt));
562 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
563 	priv = mqtt->priv;
564 	mriv = msg->priv;
565 
566 	bytes = fb_mqtt_message_bytes(msg);
567 
568 	if (G_UNLIKELY(bytes == NULL)) {
569 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
570 		              _("Failed to format data"));
571 		return;
572 	}
573 
574 	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
575 	                      "Writing %d (flags: 0x%0X)",
576 		              mriv->type, mriv->flags);
577 
578 	g_byte_array_append(priv->wbuf, bytes->data, bytes->len);
579 	fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE);
580 
581 	if (priv->wev > 0) {
582 		priv->wev = purple_input_add(priv->gsc->fd,
583 		                             PURPLE_INPUT_WRITE,
584 		                             fb_mqtt_cb_write, mqtt);
585 	}
586 }
587 
588 static void
fb_mqtt_cb_open(gpointer data,PurpleSslConnection * ssl,PurpleInputCondition cond)589 fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl,
590                 PurpleInputCondition cond)
591 {
592 	FbMqtt *mqtt = data;
593 	FbMqttPrivate *priv = mqtt->priv;
594 
595 	fb_mqtt_timeout_clear(mqtt);
596 	priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ,
597 	                             fb_mqtt_cb_read, mqtt);
598 	g_signal_emit_by_name(mqtt, "open");
599 }
600 
601 static void
fb_mqtt_cb_open_error(PurpleSslConnection * ssl,PurpleSslErrorType error,gpointer data)602 fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error,
603                       gpointer data)
604 {
605 	const gchar *str;
606 	FbMqtt *mqtt = data;
607 	FbMqttPrivate *priv = mqtt->priv;
608 	GError *err;
609 
610 	str = purple_ssl_strerror(error);
611 	err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str);
612 
613 	/* Do not call purple_ssl_close() from the error_func */
614 	priv->gsc = NULL;
615 
616 	g_signal_emit_by_name(mqtt, "error", err);
617 	g_error_free(err);
618 }
619 
620 void
fb_mqtt_open(FbMqtt * mqtt,const gchar * host,gint port)621 fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
622 {
623 	FbMqttPrivate *priv;
624 	PurpleAccount *acc;
625 
626 	g_return_if_fail(FB_IS_MQTT(mqtt));
627 	priv = mqtt->priv;
628 
629 	acc = purple_connection_get_account(priv->gc);
630 	fb_mqtt_close(mqtt);
631 	priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open,
632 	                               fb_mqtt_cb_open_error, mqtt);
633 
634 	if (priv->gsc == NULL) {
635 		fb_mqtt_cb_open_error(NULL, 0, mqtt);
636 		return;
637 	}
638 
639 	fb_mqtt_timeout(mqtt);
640 }
641 
642 void
fb_mqtt_connect(FbMqtt * mqtt,guint8 flags,const GByteArray * pload)643 fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
644 {
645 	FbMqttMessage *msg;
646 
647 	g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
648 	g_return_if_fail(pload != NULL);
649 
650 	/* Facebook always sends a CONNACK, use QoS1 */
651 	flags |= FB_MQTT_CONNECT_FLAG_QOS1;
652 
653 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
654 	fb_mqtt_message_write_str(msg, FB_MQTT_NAME);   /* Protocol name */
655 	fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
656 	fb_mqtt_message_write_byte(msg, flags);         /* Flags */
657 	fb_mqtt_message_write_u16(msg, FB_MQTT_KA);     /* Keep alive */
658 
659 	fb_mqtt_message_write(msg, pload->data, pload->len);
660 	fb_mqtt_write(mqtt, msg);
661 
662 	fb_mqtt_timeout(mqtt);
663 	g_object_unref(msg);
664 }
665 
666 gboolean
fb_mqtt_connected(FbMqtt * mqtt,gboolean error)667 fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
668 {
669 	FbMqttPrivate *priv;
670 	gboolean connected;
671 
672 	g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
673 	priv = mqtt->priv;
674 	connected = (priv->gsc != NULL) && priv->connected;
675 
676 	if (!connected && error) {
677 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
678 		              _("Not connected"));
679 	}
680 
681 	return connected;
682 }
683 
684 void
fb_mqtt_disconnect(FbMqtt * mqtt)685 fb_mqtt_disconnect(FbMqtt *mqtt)
686 {
687 	FbMqttMessage *msg;
688 
689 	if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
690 		return;
691 	}
692 
693 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
694 	fb_mqtt_write(mqtt, msg);
695 	g_object_unref(msg);
696 	fb_mqtt_close(mqtt);
697 }
698 
699 void
fb_mqtt_publish(FbMqtt * mqtt,const gchar * topic,const GByteArray * pload)700 fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
701 {
702 	FbMqttMessage *msg;
703 	FbMqttPrivate *priv;
704 
705 	g_return_if_fail(FB_IS_MQTT(mqtt));
706 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
707 	priv = mqtt->priv;
708 
709 	/* Message identifier not required, but for consistency use QoS1 */
710 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
711 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
712 
713 	fb_mqtt_message_write_str(msg, topic);      /* Message topic */
714 	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
715 
716 	if (pload != NULL) {
717 		fb_mqtt_message_write(msg, pload->data, pload->len);
718 	}
719 
720 	fb_mqtt_write(mqtt, msg);
721 	g_object_unref(msg);
722 }
723 
724 void
fb_mqtt_subscribe(FbMqtt * mqtt,const gchar * topic1,guint16 qos1,...)725 fb_mqtt_subscribe(FbMqtt *mqtt, const gchar *topic1, guint16 qos1, ...)
726 {
727 	const gchar *topic;
728 	FbMqttMessage *msg;
729 	FbMqttPrivate *priv;
730 	guint16 qos;
731 	va_list ap;
732 
733 	g_return_if_fail(FB_IS_MQTT(mqtt));
734 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
735 	priv = mqtt->priv;
736 
737 	/* Facebook requires a message identifier, use QoS1 */
738 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
739 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
740 
741 	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
742 	fb_mqtt_message_write_str(msg, topic1);     /* First topics */
743 	fb_mqtt_message_write_byte(msg, qos1);      /* First QoS value */
744 
745 	va_start(ap, qos1);
746 
747 	while ((topic = va_arg(ap, const gchar*)) != NULL) {
748 		qos = va_arg(ap, guint);
749 		fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
750 		fb_mqtt_message_write_byte(msg, qos);  /* Remaining QoS values */
751 	}
752 
753 	va_end(ap);
754 
755 	fb_mqtt_write(mqtt, msg);
756 	g_object_unref(msg);
757 }
758 
759 void
fb_mqtt_unsubscribe(FbMqtt * mqtt,const gchar * topic1,...)760 fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
761 {
762 	const gchar *topic;
763 	FbMqttMessage *msg;
764 	FbMqttPrivate *priv;
765 	va_list ap;
766 
767 	g_return_if_fail(FB_IS_MQTT(mqtt));
768 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
769 	priv = mqtt->priv;
770 
771 	/* Facebook requires a message identifier, use QoS1 */
772 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
773 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
774 
775 	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
776 	fb_mqtt_message_write_str(msg, topic1);     /* First topic */
777 
778 	va_start(ap, topic1);
779 
780 	while ((topic = va_arg(ap, const gchar*)) != NULL) {
781 		fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
782 	}
783 
784 	va_end(ap);
785 
786 	fb_mqtt_write(mqtt, msg);
787 	g_object_unref(msg);
788 }
789 
790 FbMqttMessage *
fb_mqtt_message_new(FbMqttMessageType type,FbMqttMessageFlags flags)791 fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
792 {
793 	FbMqttMessage *msg;
794 	FbMqttMessagePrivate *priv;
795 
796 	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
797 	priv = msg->priv;
798 
799 	priv->type = type;
800 	priv->flags = flags;
801 	priv->bytes = g_byte_array_new();
802 	priv->local = TRUE;
803 
804 	return msg;
805 }
806 
807 FbMqttMessage *
fb_mqtt_message_new_bytes(GByteArray * bytes)808 fb_mqtt_message_new_bytes(GByteArray *bytes)
809 {
810 	FbMqttMessage *msg;
811 	FbMqttMessagePrivate *priv;
812 	guint8 *byte;
813 
814 	g_return_val_if_fail(bytes != NULL, NULL);
815 	g_return_val_if_fail(bytes->len >= 2, NULL);
816 
817 	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
818 	priv = msg->priv;
819 
820 	priv->bytes = bytes;
821 	priv->local = FALSE;
822 	priv->type = (*bytes->data & 0xF0) >> 4;
823 	priv->flags = *bytes->data & 0x0F;
824 
825 	/* Skip the fixed header */
826 	for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; );
827 	priv->offset = byte - bytes->data;
828 	priv->pos = priv->offset;
829 
830 	return msg;
831 }
832 
833 void
fb_mqtt_message_reset(FbMqttMessage * msg)834 fb_mqtt_message_reset(FbMqttMessage *msg)
835 {
836 	FbMqttMessagePrivate *priv;
837 
838 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
839 	priv = msg->priv;
840 
841 	if (priv->offset > 0) {
842 		g_byte_array_remove_range(priv->bytes, 0, priv->offset);
843 		priv->offset = 0;
844 		priv->pos = 0;
845 	}
846 }
847 
848 const GByteArray *
fb_mqtt_message_bytes(FbMqttMessage * msg)849 fb_mqtt_message_bytes(FbMqttMessage *msg)
850 {
851 	FbMqttMessagePrivate *priv;
852 	guint i;
853 	guint8 byte;
854 	guint8 sbuf[4];
855 	guint32 size;
856 
857 	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
858 	priv = msg->priv;
859 
860 	i = 0;
861 	size = priv->bytes->len - priv->offset;
862 
863 	do {
864 		if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
865 			return NULL;
866 		}
867 
868 		byte = size % 128;
869 		size /= 128;
870 
871 		if (size > 0) {
872 			byte |= 128;
873 		}
874 
875 		sbuf[i++] = byte;
876 	} while (size > 0);
877 
878 	fb_mqtt_message_reset(msg);
879 	g_byte_array_prepend(priv->bytes, sbuf, i);
880 
881 	byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F);
882 	g_byte_array_prepend(priv->bytes, &byte, sizeof byte);
883 
884 	priv->pos = (i + 1) * (sizeof byte);
885 	return priv->bytes;
886 }
887 
888 gboolean
fb_mqtt_message_read(FbMqttMessage * msg,gpointer data,guint size)889 fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
890 {
891 	FbMqttMessagePrivate *priv;
892 
893 	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
894 	priv = msg->priv;
895 
896 	if ((priv->pos + size) > priv->bytes->len) {
897 		return FALSE;
898 	}
899 
900 	if ((data != NULL) && (size > 0)) {
901 		memcpy(data, priv->bytes->data + priv->pos, size);
902 	}
903 
904 	priv->pos += size;
905 	return TRUE;
906 }
907 
908 gboolean
fb_mqtt_message_read_r(FbMqttMessage * msg,GByteArray * bytes)909 fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
910 {
911 	FbMqttMessagePrivate *priv;
912 	guint size;
913 
914 	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
915 	priv = msg->priv;
916 	size = priv->bytes->len - priv->pos;
917 
918 	if (G_LIKELY(size > 0)) {
919 		g_byte_array_append(bytes, priv->bytes->data + priv->pos,
920 		                    size);
921 	}
922 
923 	return TRUE;
924 }
925 
926 gboolean
fb_mqtt_message_read_byte(FbMqttMessage * msg,guint8 * value)927 fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
928 {
929 	return fb_mqtt_message_read(msg, value, sizeof *value);
930 }
931 
932 gboolean
fb_mqtt_message_read_mid(FbMqttMessage * msg,guint16 * value)933 fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
934 {
935 	return fb_mqtt_message_read_u16(msg, value);
936 }
937 
938 gboolean
fb_mqtt_message_read_u16(FbMqttMessage * msg,guint16 * value)939 fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
940 {
941 	if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
942 		return FALSE;
943 	}
944 
945 	if (value != NULL) {
946 		*value = g_ntohs(*value);
947 	}
948 
949 	return TRUE;
950 }
951 
952 gboolean
fb_mqtt_message_read_str(FbMqttMessage * msg,gchar ** value)953 fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
954 {
955 	guint8 *data;
956 	guint16 size;
957 
958 	if (!fb_mqtt_message_read_u16(msg, &size)) {
959 		return FALSE;
960 	}
961 
962 	if (value != NULL) {
963 		data = g_new(guint8, size + 1);
964 		data[size] = 0;
965 	} else {
966 		data = NULL;
967 	}
968 
969 	if (!fb_mqtt_message_read(msg, data, size)) {
970 		g_free(data);
971 		return FALSE;
972 	}
973 
974 	if (value != NULL) {
975 		*value = (gchar *) data;
976 	}
977 
978 	return TRUE;
979 }
980 
981 void
fb_mqtt_message_write(FbMqttMessage * msg,gconstpointer data,guint size)982 fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
983 {
984 	FbMqttMessagePrivate *priv;
985 
986 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
987 	priv = msg->priv;
988 
989 	g_byte_array_append(priv->bytes, data, size);
990 	priv->pos += size;
991 }
992 
993 void
fb_mqtt_message_write_byte(FbMqttMessage * msg,guint8 value)994 fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
995 {
996 	fb_mqtt_message_write(msg, &value, sizeof value);
997 }
998 
999 void
fb_mqtt_message_write_mid(FbMqttMessage * msg,guint16 * value)1000 fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
1001 {
1002 	g_return_if_fail(value != NULL);
1003 	fb_mqtt_message_write_u16(msg, ++(*value));
1004 }
1005 
1006 void
fb_mqtt_message_write_u16(FbMqttMessage * msg,guint16 value)1007 fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
1008 {
1009 	value = g_htons(value);
1010 	fb_mqtt_message_write(msg, &value, sizeof value);
1011 }
1012 
1013 void
fb_mqtt_message_write_str(FbMqttMessage * msg,const gchar * value)1014 fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
1015 {
1016 	gint16 size;
1017 
1018 	g_return_if_fail(value != NULL);
1019 
1020 	size = strlen(value);
1021 	fb_mqtt_message_write_u16(msg, size);
1022 	fb_mqtt_message_write(msg, value, size);
1023 }
1024