1 /* This file is part of Cawbird, a Gtk+ linux Twitter client forked from Corebird.
2 * Copyright (C) 2017 Timm Bäder (Corebird)
3 *
4 * Cawbird is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * Cawbird is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with cawbird. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #include "cawbird.h"
19 #include "CbUserStream.h"
20 #include "CbUtils.h"
21 #include <rest/oauth-proxy.h>
22 #include <string.h>
23
24 #define short_url_length 23
25
26 G_DEFINE_TYPE (CbUserStream, cb_user_stream, G_TYPE_OBJECT);
27
28 gboolean load_timeline_tweets (gpointer user_data);
29 gboolean load_mentions_tweets (gpointer user_data);
30 gboolean load_favourited_tweets (gpointer user_data);
31 gboolean load_dm_tweets (gpointer user_data);
32
33 enum {
34 INTERRUPTED,
35 RESUMED,
36 LAST_SIGNAL
37 };
38
39 enum {
40 STATE_STOPPED, /* Initial state */
41 STATE_RUNNING, /* Started and message received */
42 STATE_STARTED, /* Started, but no message/heartbeat received yet */
43 STATE_STOPPING, /* Stopping the stream */
44 };
45
46 static guint user_stream_signals[LAST_SIGNAL] = { 0 };
47
48 static void
cb_user_stream_finalize(GObject * o)49 cb_user_stream_finalize (GObject *o)
50 {
51 CbUserStream *self = CB_USER_STREAM (o);
52
53 cb_user_stream_stop (self);
54
55 g_ptr_array_unref (self->receivers);
56 g_free (self->account_name);
57
58 if (self->network_changed_id != 0)
59 {
60 g_signal_handler_disconnect (self->network_monitor, self->network_changed_id);
61 }
62
63 G_OBJECT_CLASS (cb_user_stream_parent_class)->finalize (o);
64 }
65
66 static void
cb_user_stream_restart(CbUserStream * self)67 cb_user_stream_restart (CbUserStream *self)
68 {
69 self->restarting = TRUE;
70 cb_user_stream_stop (self);
71 cb_user_stream_start (self);
72 }
73
74 static gboolean
network_cb(gpointer user_data)75 network_cb (gpointer user_data)
76 {
77 CbUserStream *self = user_data;
78 gboolean available;
79
80 if (self->state == STATE_RUNNING)
81 {
82 self->network_timeout_id = 0;
83 return G_SOURCE_REMOVE;
84 }
85
86 available = g_network_monitor_get_network_available (self->network_monitor);
87
88 if (available)
89 {
90 g_debug ("%u Restarting stream (reason: network available (timeout))", self->state);
91 self->network_timeout_id = 0;
92 cb_user_stream_restart (self);
93 return G_SOURCE_REMOVE;
94 }
95
96 return G_SOURCE_CONTINUE;
97 }
98
99 static void
start_network_timeout(CbUserStream * self)100 start_network_timeout (CbUserStream *self)
101 {
102 if (self->network_timeout_id != 0)
103 return;
104
105 self->network_timeout_id = g_timeout_add (1 * 1000, network_cb, self);
106 }
107
108 static void
network_changed_cb(GNetworkMonitor * monitor,gboolean available,gpointer user_data)109 network_changed_cb (GNetworkMonitor *monitor,
110 gboolean available,
111 gpointer user_data)
112 {
113 CbUserStream *self = user_data;
114
115 if (available == self->network_available)
116 return;
117
118 self->network_available = available;
119
120 if (available)
121 {
122 g_debug ("%u Restarting stream (reason: Network available (callback))", self->state);
123 cb_user_stream_restart (self);
124 }
125 else
126 {
127 g_debug ("%u Connection lost (%s) Reason: network unavailable", self->state, self->account_name);
128 g_signal_emit (self, user_stream_signals[INTERRUPTED], 0);
129 cb_clear_source (&self->heartbeat_timeout_id);
130
131 start_network_timeout (self);
132 }
133 }
134
135 static void
cb_user_stream_init(CbUserStream * self)136 cb_user_stream_init (CbUserStream *self)
137 {
138 self->receivers = g_ptr_array_new ();
139 self->restarting = FALSE;
140 self->state = STATE_STOPPED;
141 self->proxy_data_set = FALSE;
142
143 self->network_monitor = g_network_monitor_get_default ();
144 self->network_available = g_network_monitor_get_network_available (self->network_monitor);
145 self->network_changed_id = g_signal_connect (self->network_monitor,
146 "network-changed",
147 G_CALLBACK (network_changed_cb), self);
148
149 if (!self->network_available)
150 start_network_timeout (self);
151 }
152
153 static void
cb_user_stream_class_init(CbUserStreamClass * klass)154 cb_user_stream_class_init (CbUserStreamClass *klass)
155 {
156 GObjectClass *object_class = G_OBJECT_CLASS (klass);
157
158 object_class->finalize = cb_user_stream_finalize;
159
160 user_stream_signals[INTERRUPTED] = g_signal_new ("interrupted",
161 G_OBJECT_CLASS_TYPE (object_class),
162 G_SIGNAL_RUN_FIRST,
163 0,
164 NULL, NULL,
165 NULL, G_TYPE_NONE, 0);
166
167 user_stream_signals[RESUMED] = g_signal_new ("resumed",
168 G_OBJECT_CLASS_TYPE (object_class),
169 G_SIGNAL_RUN_FIRST,
170 0,
171 NULL, NULL,
172 NULL, G_TYPE_NONE, 0);
173 }
174
175 CbUserStream *
cb_user_stream_new(const char * account_name,OAuthProxy * proxy)176 cb_user_stream_new (const char *account_name, OAuthProxy *proxy)
177 {
178 CbUserStream *self = CB_USER_STREAM (g_object_new (CB_TYPE_USER_STREAM, NULL));
179 self->account_name = g_strdup (account_name);
180 self->proxy = REST_PROXY(proxy);
181
182 g_debug ("Creating stream for %s", account_name);
183
184 return self;
185 }
186
187 void
stream_tweet(CbUserStream * self,CbStreamMessageType message_type,JsonNode * node)188 stream_tweet (CbUserStream *self,
189 CbStreamMessageType message_type,
190 JsonNode *node) {
191 guint i;
192
193 #if DEBUG
194 JsonGenerator *gen = json_generator_new ();
195 json_generator_set_root (gen, node);
196 json_generator_set_pretty (gen, FALSE);
197 gchar *json_dump = json_generator_to_data (gen, NULL);
198 g_debug ("Message with type %d on stream @%s: %s", message_type, self->account_name, json_dump);
199 #else
200 g_debug ("Message with type %d on stream @%s", message_type, self->account_name);
201 #endif
202
203 if (message_type == CB_STREAM_MESSAGE_UNSUPPORTED) {
204 g_debug ("Skipped unsupported message on stream @%s\n", self->account_name);
205 return;
206 }
207
208 for (i = 0; i < self->receivers->len; i++) {
209 cb_message_receiver_stream_message_received (g_ptr_array_index (self->receivers, i),
210 message_type,
211 node);
212 }
213 }
214
215 void
cb_user_stream_inject_tweet(CbUserStream * self,CbStreamMessageType message_type,const gchar * content)216 cb_user_stream_inject_tweet (CbUserStream *self,
217 CbStreamMessageType message_type,
218 const gchar *content) {
219 JsonParser *parser;
220 JsonNode *root_node;
221 JsonObject *root_obj;
222 GError *error = NULL;
223
224 parser = json_parser_new ();
225 json_parser_load_from_data (parser, content, -1, &error);
226 if (error)
227 {
228 g_warning("Failed to parse %s", content);
229 return;
230 }
231 root_node = json_parser_get_root (parser);
232 root_obj = json_node_get_object (root_node);
233
234 if (json_object_has_member (root_obj, "event"))
235 {
236 // We've got a single DM, which is wrapped, so unwrap it
237 root_node = json_object_get_member (root_obj, "event");
238 }
239 else if (json_object_has_member (root_obj, "quoted_status_permalink"))
240 {
241 // Quote tweets don't include the quoted tweet URL in the returned text, but they do when they come in the timeline
242 // So we need to fudge it here and add the URL entity and the text before we send it to the app
243 JsonObject *entities;
244 JsonArray *urls;
245 JsonObject *permalink = json_object_get_object_member (root_obj, "quoted_status_permalink");
246 const gchar *quoted_url = json_object_get_string_member (permalink, "url");
247 entities = json_object_get_object_member (root_obj, "entities");
248 urls = json_object_get_array_member (entities, "urls");
249 gboolean url_found = FALSE;
250
251 for (guint i = 0, p = json_array_get_length (urls); i < p; i ++)
252 {
253 JsonObject *url_obj = json_node_get_object (json_array_get_element (urls, i));
254 const char *url = json_object_get_string_member (url_obj, "url");
255
256 if (!g_strcmp0 (url, quoted_url))
257 {
258 url_found = TRUE;
259 break;
260 }
261 }
262
263 if (!url_found)
264 {
265 // Get the old text length
266 JsonArray *display_range = json_object_get_array_member (root_obj, "display_text_range");
267 guint64 old_length = json_array_get_int_element (display_range, 1);
268
269 // Create and set the new text
270 gchar *new_full_text = g_strdup_printf ("%s %s", json_object_get_string_member (root_obj, "full_text"), quoted_url);
271 json_object_set_string_member (root_obj, "full_text", new_full_text);
272 guint64 old_length_with_space = old_length + 1;
273 guint64 new_length = old_length_with_space + short_url_length;
274 g_free (new_full_text);
275
276 // Build the URL entity
277 JsonObject *url_obj = json_object_new ();
278 json_object_set_string_member (url_obj, "url", quoted_url);
279 json_object_set_string_member (url_obj, "expanded_url", json_object_get_string_member (permalink, "expanded"));
280 json_object_set_string_member (url_obj, "display_url", json_object_get_string_member (permalink, "display"));
281 JsonArray *indicies = json_array_sized_new (2);
282 json_array_add_int_element (indicies, old_length_with_space);
283 json_array_add_int_element (indicies, new_length);
284 json_object_set_array_member (url_obj, "indices", indicies);
285 json_array_add_object_element (urls, url_obj);
286
287 // Update the text length
288 json_array_remove_element (display_range, 1);
289 json_array_add_int_element (display_range, new_length);
290 }
291 }
292
293 stream_tweet (self, message_type, root_node);
294 }
295
296 // TODO: Refactor a common "load_tweets_done" that parses, sets the last ID and sends the right message type
297 void
load_timeline_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)298 load_timeline_tweets_done (GObject *source_object,
299 GAsyncResult *result,
300 gpointer user_data) {
301 CbUserStream *self = user_data;
302 GError *error = NULL;
303
304 JsonNode *root_node;
305 JsonArray *root_arr;
306 guint len;
307
308 root_node = cb_utils_load_threaded_finish (result, &error);
309
310 if (error != NULL)
311 {
312 g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
313 if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
314 g_debug ("Reloading timeline on SSL failure");
315 load_timeline_tweets (self);
316 }
317 return;
318 }
319
320 root_arr = json_node_get_array (root_node);
321 len = json_array_get_length (root_arr);
322
323 g_debug ("Got %d timeline tweets", len);
324 gboolean first_load = self->last_home_id == 0;
325
326 for (guint i = len; i > 0; i--) {
327 JsonNode *node = json_array_get_element (root_arr, i - 1);
328 JsonObject *obj = json_node_get_object (node);
329 self->last_home_id = json_object_get_int_member (obj, "id");
330 stream_tweet(self, CB_STREAM_MESSAGE_TWEET, node);
331 }
332
333 if (first_load) {
334 stream_tweet (self, CB_STREAM_MESSAGE_TIMELINE_LOADED, json_node_new(JSON_NODE_NULL));
335 }
336
337 g_cancellable_cancel(self->home_cancellable);
338 self->home_cancellable = NULL;
339 }
340
341 gboolean
load_timeline_tweets(gpointer user_data)342 load_timeline_tweets (gpointer user_data)
343 {
344 CbUserStream *self = user_data;
345
346 if (self->home_cancellable && ! g_cancellable_is_cancelled(self->home_cancellable)) {
347 g_debug ("Cancelling existing timeline cancellable");
348 g_cancellable_cancel(self->home_cancellable);
349 }
350
351 gboolean is_first_load = self->last_home_id == 0;
352 char* requested_tweet_count = is_first_load ? "28" : "200";
353 RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
354 g_debug("Loading timeline tweets");
355 rest_proxy_call_set_function (proxy_call, "1.1/statuses/home_timeline.json");
356 rest_proxy_call_set_method (proxy_call, "GET");
357 rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
358 rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
359 rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
360 rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
361 rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
362
363 if (!is_first_load) {
364 char since_id [21];
365 // We may occasionally miss tweets (bug #147). This appears to be because of eventual consistency (tweets appear at the server
366 // that we query *after* our last query but are timestamped *before* the 'since' ID for that query). So we need to try and overlap a bit.
367 // Tweet IDs are "snowflakes" with 12 bits of sequence (lowest), 5 bits of worker ID, 5 bits of data centre, and then the timestamp.
368 // https://github.com/twitter-archive/snowflake/blob/snowflake-2010/src/main/scala/com/twitter/service/snowflake/IdWorker.scala#L27-L36
369 // The timestamp is to millisecond accuracy, so we want to ignore the last three base-10 digits. Plus more digits to give more than
370 // one second of overlap. 10 bits is ~1s and every extra bit is double that.
371 // We mask the ID with that value to see whether we get any missed tweets.
372 // Note: this will result in at least the last tweet being reloaded each time.
373 gint timestamp_shift = 5 + 5 + 12;
374 gint overlap_shift = 13; // 13bits ~= 8 seconds
375 sprintf(since_id, "%" G_GINT64_FORMAT, self->last_home_id & (-1L << (timestamp_shift + overlap_shift)));
376 rest_proxy_call_add_param(proxy_call, "since_id", since_id);
377 }
378
379 self->home_cancellable = g_cancellable_new();
380 cb_utils_load_threaded_async (proxy_call, self->home_cancellable, load_timeline_tweets_done, self);
381 return TRUE;
382 }
383
384 void
load_mentions_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)385 load_mentions_tweets_done (GObject *source_object,
386 GAsyncResult *result,
387 gpointer user_data) {
388 CbUserStream *self = user_data;
389 GError *error = NULL;
390
391 JsonNode *root_node;
392 JsonArray *root_arr;
393 guint len;
394
395 root_node = cb_utils_load_threaded_finish (result, &error);
396
397 if (error != NULL)
398 {
399 g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
400 if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
401 g_debug ("Reloading mentions on SSL failure");
402 load_mentions_tweets (self);
403 }
404 return;
405 }
406
407 root_arr = json_node_get_array (root_node);
408 len = json_array_get_length (root_arr);
409
410 g_debug ("Got %d mention tweets", len);
411 gboolean first_load = self->last_mentions_id == 0;
412
413 for (guint i = len; i > 0; i--) {
414 JsonNode *node = json_array_get_element (root_arr, i - 1);
415 JsonObject *obj = json_node_get_object (node);
416 self->last_mentions_id = json_object_get_int_member (obj, "id");
417 stream_tweet(self, CB_STREAM_MESSAGE_MENTION, node);
418 }
419
420 if (first_load) {
421 stream_tweet (self, CB_STREAM_MESSAGE_MENTIONS_LOADED, json_node_new(JSON_NODE_NULL));
422 }
423
424 g_cancellable_cancel(self->mentions_cancellable);
425 self->mentions_cancellable = NULL;
426 }
427
428 gboolean
load_mentions_tweets(gpointer user_data)429 load_mentions_tweets (gpointer user_data)
430 {
431 CbUserStream *self = user_data;
432
433 if (self->mentions_cancellable && ! g_cancellable_is_cancelled(self->mentions_cancellable)) {
434 g_debug ("Cancelling existing mentions cancellable");
435 g_cancellable_cancel(self->mentions_cancellable);
436 }
437
438 gboolean is_first_load = self->last_mentions_id == 0;
439 char* requested_tweet_count = is_first_load ? "28" : "200";
440 RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
441 g_debug("Loading mention tweets");
442 rest_proxy_call_set_function (proxy_call, "1.1/statuses/mentions_timeline.json");
443 rest_proxy_call_set_method (proxy_call, "GET");
444 rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
445 rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
446 rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
447 rest_proxy_call_add_param (proxy_call, "include_entities", "true");
448 rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
449 rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
450
451 if (!is_first_load) {
452 char since_id [20];
453 sprintf(since_id, "%ld", self->last_mentions_id);
454 rest_proxy_call_add_param(proxy_call, "since_id", since_id);
455 }
456
457 self->mentions_cancellable = g_cancellable_new();
458 cb_utils_load_threaded_async (proxy_call, self->mentions_cancellable, load_mentions_tweets_done, self);
459 return TRUE;
460 }
461
462 void
load_favourited_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)463 load_favourited_tweets_done (GObject *source_object,
464 GAsyncResult *result,
465 gpointer user_data) {
466 CbUserStream *self = user_data;
467 GError *error = NULL;
468
469 JsonNode *root_node;
470 JsonArray *root_arr;
471 guint len;
472
473 root_node = cb_utils_load_threaded_finish (result, &error);
474
475 if (error != NULL)
476 {
477 g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
478 if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
479 g_debug ("Reloading favorited on SSL failure");
480 load_favourited_tweets (self);
481 }
482 return;
483 }
484
485 root_arr = json_node_get_array (root_node);
486 len = json_array_get_length (root_arr);
487
488 g_debug ("Got %d favourited tweets", len);
489 gboolean first_load = self->last_favourited_id == 0;
490
491 for (guint i = len; i > 0; i--) {
492 JsonNode *node = json_array_get_element (root_arr, i - 1);
493 JsonObject *obj = json_node_get_object (node);
494 self->last_favourited_id = json_object_get_int_member (obj, "id");
495 stream_tweet(self, CB_STREAM_MESSAGE_EVENT_FAVORITE, node);
496 }
497
498 if (first_load) {
499 stream_tweet (self, CB_STREAM_MESSAGE_FAVORITES_LOADED, json_node_new(JSON_NODE_NULL));
500 }
501
502 g_cancellable_cancel(self->favourited_cancellable);
503 self->favourited_cancellable = NULL;
504 }
505
506 gboolean
load_favourited_tweets(gpointer user_data)507 load_favourited_tweets (gpointer user_data)
508 {
509 CbUserStream *self = user_data;
510
511 if (self->favourited_cancellable && ! g_cancellable_is_cancelled(self->favourited_cancellable)) {
512 g_debug ("Cancelling existing favourites cancellable");
513 g_cancellable_cancel(self->favourited_cancellable);
514 }
515
516 gboolean is_first_load = self->last_favourited_id == 0;
517 char* requested_tweet_count = is_first_load ? "28" : "200";
518 RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
519 g_debug("Loading favourited tweets");
520 rest_proxy_call_set_function (proxy_call, "1.1/favorites/list.json");
521 rest_proxy_call_set_method (proxy_call, "GET");
522 rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
523 rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
524 rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
525 rest_proxy_call_add_param (proxy_call, "include_entities", "true");
526 rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
527 rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
528
529 if (!is_first_load) {
530 char since_id [20];
531 sprintf(since_id, "%ld", self->last_favourited_id);
532 rest_proxy_call_add_param(proxy_call, "since_id", since_id);
533 }
534
535 self->favourited_cancellable = g_cancellable_new();
536 cb_utils_load_threaded_async (proxy_call, self->favourited_cancellable, load_favourited_tweets_done, self);
537 return TRUE;
538 }
539
540 // Fix a cyclic definition
541 void
542 load_dm_tweets_with_cursor (gpointer user_data, const gchar *cursor);
543
544 void
load_dm_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)545 load_dm_tweets_done (GObject *source_object,
546 GAsyncResult *result,
547 gpointer user_data) {
548 CbUserStream *self = user_data;
549 GError *error = NULL;
550
551 JsonNode *root_node;
552 JsonObject *root_obj;
553 JsonArray *root_arr;
554 guint len;
555
556 root_node = cb_utils_load_threaded_finish (result, &error);
557
558 if (error != NULL)
559 {
560 g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
561 if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
562 g_debug ("Reloading DMs on SSL failure");
563 load_dm_tweets (self);
564 }
565 return;
566 }
567
568 root_obj = json_node_get_object (root_node);
569 root_arr = json_object_get_array_member(root_obj, "events");
570 len = json_array_get_length (root_arr);
571
572 // TODO: Look for a "next_cursor" and load older DMs
573 // https://developer.twitter.com/en/docs/direct-messages/sending-and-receiving/api-reference/list-events
574
575 g_debug ("Got %d DMs", len);
576
577 gboolean all_newer = TRUE;
578 gboolean all_older = TRUE;
579
580 for (guint i = len; i > 0; i--) {
581 JsonNode *node = json_array_get_element (root_arr, i - 1);
582 JsonObject *obj = json_node_get_object (node);
583 int message_type = CB_STREAM_MESSAGE_UNSUPPORTED;
584 const gchar *type = json_object_get_string_member(obj, "type");
585
586 if (strcmp(type, "message_create") == 0) {
587 message_type = CB_STREAM_MESSAGE_DIRECT_MESSAGE;
588 }
589
590 gint64 id = strtol (json_object_get_string_member (obj, "id"), NULL, 10);
591
592 if (id < self->first_dm_id) {
593 self->first_dm_id = id;
594 all_newer = FALSE;
595 }
596 else if (id <= self->last_dm_id) {
597 all_older = FALSE;
598 all_newer = FALSE;
599 // DMs behave differently to other "timelines" so we need to ignore messages we've seen
600 // And we assume we've seen it if it has an older ID. But we can't break because later
601 // in the collection is newer and might be unseen.
602 continue;
603 }
604 else {
605 all_older = FALSE;
606 if (id > self->new_last_dm_id) {
607 self->new_last_dm_id = id;
608 }
609 }
610
611 stream_tweet (self, message_type, node);
612 }
613
614 g_cancellable_cancel(self->dm_cancellable);
615 self->dm_cancellable = NULL;
616
617 gboolean first_load = self->last_dm_id == 0;
618 // Limit fetches for throttling. 5 at first load lets us load ~300 old DMs from last 30 days.
619 // 1 recursion when scheduled every 2 minutes runs right to the "15/15min" limit.
620 // This *should* only cause throttling problems just when someone has a big backlog AND has
621 // lots of tweets coming in every 2 minutes.
622 unsigned char max_recursions = first_load ? 5 : 1;
623
624 if ((all_newer || all_older) && self->dm_recursions < max_recursions && json_object_has_member(root_obj, "next_cursor")) {
625 self->dm_recursions++;
626 const gchar *cursor = json_object_get_string_member(root_obj, "next_cursor");
627 load_dm_tweets_with_cursor(user_data, cursor);
628 }
629 else {
630 if (first_load) {
631 stream_tweet (self, CB_STREAM_MESSAGE_DIRECT_MESSAGES_LOADED, json_node_new(JSON_NODE_NULL));
632 }
633
634 self->last_dm_id = self->new_last_dm_id;
635 self->dm_recursions = 0;
636 }
637 }
638
639 void
load_dm_tweets_with_cursor(gpointer user_data,const gchar * cursor)640 load_dm_tweets_with_cursor (gpointer user_data, const gchar *cursor)
641 {
642 CbUserStream *self = user_data;
643
644 if (self->dm_cancellable && ! g_cancellable_is_cancelled(self->dm_cancellable)) {
645 g_debug ("Cancelling existing cancellable");
646 g_cancellable_cancel(self->dm_cancellable);
647 }
648
649 RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
650 rest_proxy_call_set_function (proxy_call, "1.1/direct_messages/events/list.json");
651 rest_proxy_call_set_method (proxy_call, "GET");
652 rest_proxy_call_add_param (proxy_call, "count", "50");
653 if (cursor) {
654 rest_proxy_call_add_param(proxy_call, "cursor", cursor);
655 }
656 g_debug("Loading DM tweets for cursor %s", cursor ? cursor : "none");
657
658 self->dm_cancellable = g_cancellable_new();
659 cb_utils_load_threaded_async (proxy_call, self->dm_cancellable, load_dm_tweets_done, self);
660 }
661
662 gboolean
load_dm_tweets(gpointer user_data)663 load_dm_tweets (gpointer user_data)
664 {
665 load_dm_tweets_with_cursor(user_data, NULL);
666 return TRUE;
667 }
668
669 void
cb_user_stream_start(CbUserStream * self)670 cb_user_stream_start (CbUserStream *self)
671 {
672 g_debug("Loading timeline tweets on start");
673 load_timeline_tweets (self);
674 g_debug("Loading mention tweets on start");
675 load_mentions_tweets (self);
676 g_debug("Loading favourited tweets on start");
677 load_favourited_tweets (self);
678 g_debug("Loading DMs on start");
679 load_dm_tweets (self);
680
681 if (!self->timeline_timeout) {
682 g_debug("Adding timeout for timeline");
683 self->timeline_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_timeline_tweets, self, NULL);
684 }
685 if (!self->mentions_timeout) {
686 g_debug("Adding timeout for mentions");
687 self->mentions_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_mentions_tweets, self, NULL);
688 }
689 if (!self->favourited_timeout) {
690 g_debug("Adding timeout for favourites");
691 self->favourited_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_favourited_tweets, self, NULL);
692 }
693 if (!self->dm_timeout) {
694 g_debug("Adding timeout for DMs");
695 self->dm_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_dm_tweets, self, NULL);
696 }
697 }
698
cb_user_stream_stop(CbUserStream * self)699 void cb_user_stream_stop (CbUserStream *self)
700 {
701 if (self->timeline_timeout) {
702 g_source_remove (self->timeline_timeout);
703 self->timeline_timeout = 0;
704 }
705 if (self->mentions_timeout) {
706 g_source_remove (self->mentions_timeout);
707 self->mentions_timeout = 0;
708 }
709 if (self->favourited_timeout) {
710 g_source_remove (self->favourited_timeout);
711 self->favourited_timeout = 0;
712 }
713 if (self->dm_timeout) {
714 g_source_remove (self->dm_timeout);
715 self->dm_timeout = 0;
716 }
717 }
718
719 void
cb_user_stream_set_proxy_data(CbUserStream * self,const char * token,const char * token_secret)720 cb_user_stream_set_proxy_data (CbUserStream *self,
721 const char *token,
722 const char *token_secret)
723 {
724 // oauth_proxy_set_token (OAUTH_PROXY (self->proxy), token);
725 // oauth_proxy_set_token_secret (OAUTH_PROXY (self->proxy), token_secret);
726
727 self->proxy_data_set = TRUE;
728 }
729
730 void
cb_user_stream_register(CbUserStream * self,CbMessageReceiver * receiver)731 cb_user_stream_register (CbUserStream *self,
732 CbMessageReceiver *receiver)
733 {
734 g_ptr_array_add (self->receivers, receiver);
735 }
736
737 void
cb_user_stream_unregister(CbUserStream * self,CbMessageReceiver * receiver)738 cb_user_stream_unregister (CbUserStream *self,
739 CbMessageReceiver *receiver)
740 {
741 guint i;
742
743 for (i = 0; i < self->receivers->len; i ++)
744 {
745 CbMessageReceiver *r = g_ptr_array_index (self->receivers, i);
746
747 if (r == receiver)
748 {
749 g_ptr_array_remove_index_fast (self->receivers, i);
750 break;
751 }
752 }
753 }