1 /*  This file is part of Cawbird, a Gtk+ linux Twitter client forked from Corebird.
2  *  Copyright (C) 2017 Timm Bäder (Corebird)
3  *
4  *  Cawbird is free software: you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation, either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  Cawbird is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with cawbird.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #include "cawbird.h"
19 #include "CbUserStream.h"
20 #include "CbUtils.h"
21 #include <rest/oauth-proxy.h>
22 #include <string.h>
23 
24 #define short_url_length 23
25 
26 G_DEFINE_TYPE (CbUserStream, cb_user_stream, G_TYPE_OBJECT);
27 
28 gboolean load_timeline_tweets (gpointer user_data);
29 gboolean load_mentions_tweets (gpointer user_data);
30 gboolean load_favourited_tweets (gpointer user_data);
31 gboolean load_dm_tweets (gpointer user_data);
32 
33 enum {
34   INTERRUPTED,
35   RESUMED,
36   LAST_SIGNAL
37 };
38 
39 enum {
40   STATE_STOPPED,    /* Initial state */
41   STATE_RUNNING,    /* Started and message received */
42   STATE_STARTED,    /* Started, but no message/heartbeat received yet */
43   STATE_STOPPING,   /* Stopping the stream */
44 };
45 
46 static guint user_stream_signals[LAST_SIGNAL] = { 0 };
47 
48 static void
cb_user_stream_finalize(GObject * o)49 cb_user_stream_finalize (GObject *o)
50 {
51   CbUserStream *self = CB_USER_STREAM (o);
52 
53   cb_user_stream_stop (self);
54 
55   g_ptr_array_unref (self->receivers);
56   g_free (self->account_name);
57 
58   if (self->network_changed_id != 0)
59     {
60       g_signal_handler_disconnect (self->network_monitor, self->network_changed_id);
61     }
62 
63   G_OBJECT_CLASS (cb_user_stream_parent_class)->finalize (o);
64 }
65 
66 static void
cb_user_stream_restart(CbUserStream * self)67 cb_user_stream_restart (CbUserStream *self)
68 {
69   self->restarting = TRUE;
70   cb_user_stream_stop (self);
71   cb_user_stream_start (self);
72 }
73 
74 static gboolean
network_cb(gpointer user_data)75 network_cb (gpointer user_data)
76 {
77   CbUserStream *self = user_data;
78   gboolean available;
79 
80   if (self->state == STATE_RUNNING)
81     {
82       self->network_timeout_id = 0;
83       return G_SOURCE_REMOVE;
84     }
85 
86   available = g_network_monitor_get_network_available (self->network_monitor);
87 
88   if (available)
89     {
90       g_debug ("%u Restarting stream (reason: network available (timeout))", self->state);
91       self->network_timeout_id = 0;
92       cb_user_stream_restart (self);
93       return G_SOURCE_REMOVE;
94     }
95 
96   return G_SOURCE_CONTINUE;
97 }
98 
99 static void
start_network_timeout(CbUserStream * self)100 start_network_timeout (CbUserStream *self)
101 {
102   if (self->network_timeout_id != 0)
103     return;
104 
105   self->network_timeout_id = g_timeout_add (1 * 1000, network_cb, self);
106 }
107 
108 static void
network_changed_cb(GNetworkMonitor * monitor,gboolean available,gpointer user_data)109 network_changed_cb (GNetworkMonitor *monitor,
110                     gboolean         available,
111                     gpointer         user_data)
112 {
113   CbUserStream *self = user_data;
114 
115   if (available == self->network_available)
116     return;
117 
118   self->network_available = available;
119 
120   if (available)
121     {
122       g_debug ("%u Restarting stream (reason: Network available (callback))", self->state);
123       cb_user_stream_restart (self);
124     }
125   else
126     {
127       g_debug ("%u Connection lost (%s) Reason: network unavailable", self->state, self->account_name);
128       g_signal_emit (self, user_stream_signals[INTERRUPTED], 0);
129       cb_clear_source (&self->heartbeat_timeout_id);
130 
131       start_network_timeout (self);
132     }
133 }
134 
135 static void
cb_user_stream_init(CbUserStream * self)136 cb_user_stream_init (CbUserStream *self)
137 {
138   self->receivers = g_ptr_array_new ();
139   self->restarting = FALSE;
140   self->state = STATE_STOPPED;
141   self->proxy_data_set = FALSE;
142 
143   self->network_monitor = g_network_monitor_get_default ();
144   self->network_available = g_network_monitor_get_network_available (self->network_monitor);
145   self->network_changed_id = g_signal_connect (self->network_monitor,
146                                                "network-changed",
147                                                G_CALLBACK (network_changed_cb), self);
148 
149   if (!self->network_available)
150     start_network_timeout (self);
151 }
152 
153 static void
cb_user_stream_class_init(CbUserStreamClass * klass)154 cb_user_stream_class_init (CbUserStreamClass *klass)
155 {
156   GObjectClass *object_class = G_OBJECT_CLASS (klass);
157 
158   object_class->finalize = cb_user_stream_finalize;
159 
160   user_stream_signals[INTERRUPTED] = g_signal_new ("interrupted",
161                                                    G_OBJECT_CLASS_TYPE (object_class),
162                                                    G_SIGNAL_RUN_FIRST,
163                                                    0,
164                                                    NULL, NULL,
165                                                    NULL, G_TYPE_NONE, 0);
166 
167   user_stream_signals[RESUMED] = g_signal_new ("resumed",
168                                                 G_OBJECT_CLASS_TYPE (object_class),
169                                                 G_SIGNAL_RUN_FIRST,
170                                                 0,
171                                                 NULL, NULL,
172                                                 NULL, G_TYPE_NONE, 0);
173 }
174 
175 CbUserStream *
cb_user_stream_new(const char * account_name,OAuthProxy * proxy)176 cb_user_stream_new (const char *account_name, OAuthProxy *proxy)
177 {
178   CbUserStream *self = CB_USER_STREAM (g_object_new (CB_TYPE_USER_STREAM, NULL));
179   self->account_name = g_strdup (account_name);
180   self->proxy = REST_PROXY(proxy);
181 
182   g_debug ("Creating stream for %s", account_name);
183 
184   return self;
185 }
186 
187 void
stream_tweet(CbUserStream * self,CbStreamMessageType message_type,JsonNode * node)188 stream_tweet (CbUserStream *self,
189               CbStreamMessageType  message_type,
190               JsonNode            *node) {
191   guint i;
192 
193 #if DEBUG
194   JsonGenerator *gen = json_generator_new ();
195   json_generator_set_root (gen, node);
196   json_generator_set_pretty (gen, FALSE);
197   gchar *json_dump = json_generator_to_data (gen, NULL);
198   g_debug ("Message with type %d on stream @%s: %s", message_type, self->account_name, json_dump);
199 #else
200   g_debug ("Message with type %d on stream @%s", message_type, self->account_name);
201 #endif
202 
203   if (message_type == CB_STREAM_MESSAGE_UNSUPPORTED) {
204     g_debug ("Skipped unsupported message on stream @%s\n", self->account_name);
205     return;
206   }
207 
208   for (i = 0; i < self->receivers->len; i++) {
209     cb_message_receiver_stream_message_received (g_ptr_array_index (self->receivers, i),
210                                                   message_type,
211                                                   node);
212   }
213 }
214 
215 void
cb_user_stream_inject_tweet(CbUserStream * self,CbStreamMessageType message_type,const gchar * content)216 cb_user_stream_inject_tweet (CbUserStream *self,
217               CbStreamMessageType  message_type,
218               const gchar *content) {
219   JsonParser *parser;
220   JsonNode *root_node;
221   JsonObject *root_obj;
222   GError *error = NULL;
223 
224   parser = json_parser_new ();
225   json_parser_load_from_data (parser, content, -1, &error);
226   if (error)
227     {
228       g_warning("Failed to parse %s", content);
229       return;
230     }
231   root_node = json_parser_get_root (parser);
232   root_obj = json_node_get_object (root_node);
233 
234   if (json_object_has_member (root_obj, "event"))
235     {
236       // We've got a single DM, which is wrapped, so unwrap it
237       root_node = json_object_get_member (root_obj, "event");
238     }
239   else if (json_object_has_member (root_obj, "quoted_status_permalink"))
240     {
241       // Quote tweets don't include the quoted tweet URL in the returned text, but they do when they come in the timeline
242       // So we need to fudge it here and add the URL entity and the text before we send it to the app
243       JsonObject *entities;
244       JsonArray *urls;
245       JsonObject *permalink = json_object_get_object_member (root_obj, "quoted_status_permalink");
246       const gchar *quoted_url = json_object_get_string_member (permalink, "url");
247       entities      = json_object_get_object_member (root_obj, "entities");
248       urls          = json_object_get_array_member (entities, "urls");
249       gboolean url_found = FALSE;
250 
251       for (guint i  = 0, p = json_array_get_length (urls); i < p; i ++)
252         {
253           JsonObject *url_obj = json_node_get_object (json_array_get_element (urls, i));
254           const char *url = json_object_get_string_member (url_obj, "url");
255 
256           if (!g_strcmp0 (url, quoted_url))
257             {
258               url_found = TRUE;
259               break;
260             }
261         }
262 
263       if (!url_found)
264         {
265           // Get the old text length
266           JsonArray *display_range = json_object_get_array_member (root_obj, "display_text_range");
267           guint64 old_length = json_array_get_int_element (display_range, 1);
268 
269           // Create and set the new text
270           gchar *new_full_text = g_strdup_printf ("%s %s", json_object_get_string_member (root_obj, "full_text"), quoted_url);
271           json_object_set_string_member (root_obj, "full_text", new_full_text);
272           guint64 old_length_with_space = old_length + 1;
273           guint64 new_length = old_length_with_space + short_url_length;
274           g_free (new_full_text);
275 
276           // Build the URL entity
277           JsonObject *url_obj = json_object_new ();
278           json_object_set_string_member (url_obj, "url", quoted_url);
279           json_object_set_string_member (url_obj, "expanded_url", json_object_get_string_member (permalink, "expanded"));
280           json_object_set_string_member (url_obj, "display_url", json_object_get_string_member (permalink, "display"));
281           JsonArray *indicies = json_array_sized_new (2);
282           json_array_add_int_element (indicies, old_length_with_space);
283           json_array_add_int_element (indicies, new_length);
284           json_object_set_array_member (url_obj, "indices", indicies);
285           json_array_add_object_element (urls, url_obj);
286 
287           // Update the text length
288           json_array_remove_element (display_range, 1);
289           json_array_add_int_element (display_range, new_length);
290         }
291     }
292 
293   stream_tweet (self, message_type, root_node);
294 }
295 
296 // TODO: Refactor a common "load_tweets_done" that parses, sets the last ID and sends the right message type
297 void
load_timeline_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)298 load_timeline_tweets_done  (GObject *source_object,
299                         GAsyncResult *result,
300                         gpointer user_data) {
301   CbUserStream *self = user_data;
302   GError *error = NULL;
303 
304   JsonNode *root_node;
305   JsonArray *root_arr;
306   guint len;
307 
308   root_node = cb_utils_load_threaded_finish (result, &error);
309 
310   if (error != NULL)
311     {
312       g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
313       if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
314         g_debug ("Reloading timeline on SSL failure");
315         load_timeline_tweets (self);
316       }
317       return;
318     }
319 
320   root_arr = json_node_get_array (root_node);
321   len = json_array_get_length (root_arr);
322 
323   g_debug ("Got %d timeline tweets", len);
324   gboolean first_load = self->last_home_id == 0;
325 
326   for (guint i = len; i > 0; i--) {
327     JsonNode *node = json_array_get_element (root_arr, i - 1);
328     JsonObject *obj = json_node_get_object (node);
329     self->last_home_id = json_object_get_int_member (obj, "id");
330     stream_tweet(self, CB_STREAM_MESSAGE_TWEET, node);
331   }
332 
333   if (first_load) {
334     stream_tweet (self, CB_STREAM_MESSAGE_TIMELINE_LOADED, json_node_new(JSON_NODE_NULL));
335   }
336 
337   g_cancellable_cancel(self->home_cancellable);
338   self->home_cancellable = NULL;
339 }
340 
341 gboolean
load_timeline_tweets(gpointer user_data)342 load_timeline_tweets (gpointer user_data)
343 {
344   CbUserStream *self = user_data;
345 
346   if (self->home_cancellable && ! g_cancellable_is_cancelled(self->home_cancellable)) {
347     g_debug ("Cancelling existing timeline cancellable");
348     g_cancellable_cancel(self->home_cancellable);
349   }
350 
351   gboolean is_first_load = self->last_home_id == 0;
352   char* requested_tweet_count = is_first_load ? "28" : "200";
353   RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
354   g_debug("Loading timeline tweets");
355   rest_proxy_call_set_function (proxy_call, "1.1/statuses/home_timeline.json");
356   rest_proxy_call_set_method (proxy_call, "GET");
357   rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
358   rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
359   rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
360   rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
361   rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
362 
363   if (!is_first_load) {
364     char since_id [21];
365     // We may occasionally miss tweets (bug #147). This appears to be because of eventual consistency (tweets appear at the server
366     // that we query *after* our last query but are timestamped *before* the 'since' ID for that query). So we need to try and overlap a bit.
367     // Tweet IDs are "snowflakes" with 12 bits of sequence (lowest), 5 bits of worker ID, 5 bits of data centre, and then the timestamp.
368     // https://github.com/twitter-archive/snowflake/blob/snowflake-2010/src/main/scala/com/twitter/service/snowflake/IdWorker.scala#L27-L36
369     // The timestamp is to millisecond accuracy, so we want to ignore the last three base-10 digits. Plus more digits to give more than
370     // one second of overlap. 10 bits is ~1s and every extra bit is double that.
371     // We mask the ID with that value to see whether we get any missed tweets.
372     // Note: this will result in at least the last tweet being reloaded each time.
373     gint timestamp_shift = 5 + 5 + 12;
374     gint overlap_shift = 13; // 13bits ~= 8 seconds
375     sprintf(since_id, "%" G_GINT64_FORMAT, self->last_home_id & (-1L << (timestamp_shift + overlap_shift)));
376     rest_proxy_call_add_param(proxy_call, "since_id", since_id);
377   }
378 
379   self->home_cancellable = g_cancellable_new();
380   cb_utils_load_threaded_async (proxy_call, self->home_cancellable, load_timeline_tweets_done, self);
381   return TRUE;
382 }
383 
384 void
load_mentions_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)385 load_mentions_tweets_done  (GObject *source_object,
386                         GAsyncResult *result,
387                         gpointer user_data) {
388   CbUserStream *self = user_data;
389   GError *error = NULL;
390 
391   JsonNode *root_node;
392   JsonArray *root_arr;
393   guint len;
394 
395   root_node = cb_utils_load_threaded_finish (result, &error);
396 
397   if (error != NULL)
398     {
399       g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
400       if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
401         g_debug ("Reloading mentions on SSL failure");
402         load_mentions_tweets (self);
403       }
404       return;
405     }
406 
407   root_arr = json_node_get_array (root_node);
408   len = json_array_get_length (root_arr);
409 
410   g_debug ("Got %d mention tweets", len);
411   gboolean first_load = self->last_mentions_id == 0;
412 
413   for (guint i = len; i > 0; i--) {
414     JsonNode *node = json_array_get_element (root_arr, i - 1);
415     JsonObject *obj = json_node_get_object (node);
416     self->last_mentions_id = json_object_get_int_member (obj, "id");
417     stream_tweet(self, CB_STREAM_MESSAGE_MENTION, node);
418   }
419 
420   if (first_load) {
421     stream_tweet (self, CB_STREAM_MESSAGE_MENTIONS_LOADED, json_node_new(JSON_NODE_NULL));
422   }
423 
424   g_cancellable_cancel(self->mentions_cancellable);
425   self->mentions_cancellable = NULL;
426 }
427 
428 gboolean
load_mentions_tweets(gpointer user_data)429 load_mentions_tweets (gpointer user_data)
430 {
431   CbUserStream *self = user_data;
432 
433   if (self->mentions_cancellable && ! g_cancellable_is_cancelled(self->mentions_cancellable)) {
434     g_debug ("Cancelling existing mentions cancellable");
435     g_cancellable_cancel(self->mentions_cancellable);
436   }
437 
438   gboolean is_first_load = self->last_mentions_id == 0;
439   char* requested_tweet_count = is_first_load ? "28" : "200";
440   RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
441   g_debug("Loading mention tweets");
442   rest_proxy_call_set_function (proxy_call, "1.1/statuses/mentions_timeline.json");
443   rest_proxy_call_set_method (proxy_call, "GET");
444   rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
445   rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
446   rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
447   rest_proxy_call_add_param (proxy_call, "include_entities", "true");
448   rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
449   rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
450 
451   if (!is_first_load) {
452     char since_id [20];
453     sprintf(since_id, "%ld", self->last_mentions_id);
454     rest_proxy_call_add_param(proxy_call, "since_id", since_id);
455   }
456 
457   self->mentions_cancellable = g_cancellable_new();
458   cb_utils_load_threaded_async (proxy_call, self->mentions_cancellable, load_mentions_tweets_done, self);
459   return TRUE;
460 }
461 
462 void
load_favourited_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)463 load_favourited_tweets_done  (GObject *source_object,
464                         GAsyncResult *result,
465                         gpointer user_data) {
466   CbUserStream *self = user_data;
467   GError *error = NULL;
468 
469   JsonNode *root_node;
470   JsonArray *root_arr;
471   guint len;
472 
473   root_node = cb_utils_load_threaded_finish (result, &error);
474 
475   if (error != NULL)
476     {
477       g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
478       if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
479         g_debug ("Reloading favorited on SSL failure");
480         load_favourited_tweets (self);
481       }
482       return;
483     }
484 
485   root_arr = json_node_get_array (root_node);
486   len = json_array_get_length (root_arr);
487 
488   g_debug ("Got %d favourited tweets", len);
489   gboolean first_load = self->last_favourited_id == 0;
490 
491   for (guint i = len; i > 0; i--) {
492     JsonNode *node = json_array_get_element (root_arr, i - 1);
493     JsonObject *obj = json_node_get_object (node);
494     self->last_favourited_id = json_object_get_int_member (obj, "id");
495     stream_tweet(self, CB_STREAM_MESSAGE_EVENT_FAVORITE, node);
496   }
497 
498   if (first_load) {
499     stream_tweet (self, CB_STREAM_MESSAGE_FAVORITES_LOADED, json_node_new(JSON_NODE_NULL));
500   }
501 
502   g_cancellable_cancel(self->favourited_cancellable);
503   self->favourited_cancellable = NULL;
504 }
505 
506 gboolean
load_favourited_tweets(gpointer user_data)507 load_favourited_tweets (gpointer user_data)
508 {
509   CbUserStream *self = user_data;
510 
511   if (self->favourited_cancellable && ! g_cancellable_is_cancelled(self->favourited_cancellable)) {
512     g_debug ("Cancelling existing favourites cancellable");
513     g_cancellable_cancel(self->favourited_cancellable);
514   }
515 
516   gboolean is_first_load = self->last_favourited_id == 0;
517   char* requested_tweet_count = is_first_load ? "28" : "200";
518   RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
519   g_debug("Loading favourited tweets");
520   rest_proxy_call_set_function (proxy_call, "1.1/favorites/list.json");
521   rest_proxy_call_set_method (proxy_call, "GET");
522   rest_proxy_call_add_param (proxy_call, "count", requested_tweet_count);
523   rest_proxy_call_add_param (proxy_call, "contributor_details", "true");
524   rest_proxy_call_add_param (proxy_call, "include_my_retweet", "true");
525   rest_proxy_call_add_param (proxy_call, "include_entities", "true");
526   rest_proxy_call_add_param (proxy_call, "tweet_mode", "extended");
527   rest_proxy_call_add_param (proxy_call, "include_ext_alt_text", "true");
528 
529   if (!is_first_load) {
530     char since_id [20];
531     sprintf(since_id, "%ld", self->last_favourited_id);
532     rest_proxy_call_add_param(proxy_call, "since_id", since_id);
533   }
534 
535   self->favourited_cancellable = g_cancellable_new();
536   cb_utils_load_threaded_async (proxy_call, self->favourited_cancellable, load_favourited_tweets_done, self);
537   return TRUE;
538 }
539 
540 // Fix a cyclic definition
541 void
542 load_dm_tweets_with_cursor (gpointer user_data, const gchar *cursor);
543 
544 void
load_dm_tweets_done(GObject * source_object,GAsyncResult * result,gpointer user_data)545 load_dm_tweets_done  (GObject *source_object,
546                         GAsyncResult *result,
547                         gpointer user_data) {
548   CbUserStream *self = user_data;
549   GError *error = NULL;
550 
551   JsonNode *root_node;
552   JsonObject *root_obj;
553   JsonArray *root_arr;
554   guint len;
555 
556   root_node = cb_utils_load_threaded_finish (result, &error);
557 
558   if (error != NULL)
559     {
560       g_warning ("%s: %s (%s - %d)", __FUNCTION__, error->message, g_quark_to_string (error->domain), error->code);
561       if (error->domain == REST_PROXY_ERROR && error->code == REST_PROXY_ERROR_SSL) {
562         g_debug ("Reloading DMs on SSL failure");
563         load_dm_tweets (self);
564       }
565       return;
566     }
567 
568   root_obj = json_node_get_object (root_node);
569   root_arr = json_object_get_array_member(root_obj, "events");
570   len = json_array_get_length (root_arr);
571 
572   // TODO: Look for a "next_cursor" and load older DMs
573   // https://developer.twitter.com/en/docs/direct-messages/sending-and-receiving/api-reference/list-events
574 
575   g_debug ("Got %d DMs", len);
576 
577   gboolean all_newer = TRUE;
578   gboolean all_older = TRUE;
579 
580   for (guint i = len; i > 0; i--) {
581     JsonNode *node = json_array_get_element (root_arr, i - 1);
582     JsonObject *obj = json_node_get_object (node);
583     int message_type = CB_STREAM_MESSAGE_UNSUPPORTED;
584     const gchar *type = json_object_get_string_member(obj, "type");
585 
586     if (strcmp(type, "message_create") == 0) {
587       message_type = CB_STREAM_MESSAGE_DIRECT_MESSAGE;
588     }
589 
590     gint64 id = strtol (json_object_get_string_member (obj, "id"), NULL, 10);
591 
592     if (id < self->first_dm_id) {
593       self->first_dm_id = id;
594       all_newer = FALSE;
595     }
596     else if (id <= self->last_dm_id) {
597       all_older = FALSE;
598       all_newer = FALSE;
599       // DMs behave differently to other "timelines" so we need to ignore messages we've seen
600       // And we assume we've seen it if it has an older ID. But we can't break because later
601       // in the collection is newer and might be unseen.
602       continue;
603     }
604     else {
605       all_older = FALSE;
606       if (id > self->new_last_dm_id) {
607         self->new_last_dm_id = id;
608       }
609     }
610 
611     stream_tweet (self, message_type, node);
612   }
613 
614   g_cancellable_cancel(self->dm_cancellable);
615   self->dm_cancellable = NULL;
616 
617   gboolean first_load = self->last_dm_id == 0;
618   // Limit fetches for throttling. 5 at first load lets us load ~300 old DMs from last 30 days.
619   // 1 recursion when scheduled every 2 minutes runs right to the "15/15min" limit.
620   // This *should* only cause throttling problems just when someone has a big backlog AND has
621   // lots of tweets coming in every 2 minutes.
622   unsigned char max_recursions = first_load ? 5 : 1;
623 
624   if ((all_newer || all_older) && self->dm_recursions < max_recursions && json_object_has_member(root_obj, "next_cursor")) {
625     self->dm_recursions++;
626     const gchar *cursor = json_object_get_string_member(root_obj, "next_cursor");
627     load_dm_tweets_with_cursor(user_data, cursor);
628   }
629   else {
630     if (first_load) {
631       stream_tweet (self, CB_STREAM_MESSAGE_DIRECT_MESSAGES_LOADED, json_node_new(JSON_NODE_NULL));
632     }
633 
634     self->last_dm_id = self->new_last_dm_id;
635     self->dm_recursions = 0;
636   }
637 }
638 
639 void
load_dm_tweets_with_cursor(gpointer user_data,const gchar * cursor)640 load_dm_tweets_with_cursor (gpointer user_data, const gchar *cursor)
641 {
642   CbUserStream *self = user_data;
643 
644   if (self->dm_cancellable && ! g_cancellable_is_cancelled(self->dm_cancellable)) {
645     g_debug ("Cancelling existing cancellable");
646     g_cancellable_cancel(self->dm_cancellable);
647   }
648 
649   RestProxyCall *proxy_call = rest_proxy_new_call (self->proxy);
650   rest_proxy_call_set_function (proxy_call, "1.1/direct_messages/events/list.json");
651   rest_proxy_call_set_method (proxy_call, "GET");
652   rest_proxy_call_add_param (proxy_call, "count", "50");
653   if (cursor) {
654     rest_proxy_call_add_param(proxy_call, "cursor", cursor);
655   }
656   g_debug("Loading DM tweets for cursor %s", cursor ? cursor : "none");
657 
658   self->dm_cancellable = g_cancellable_new();
659   cb_utils_load_threaded_async (proxy_call, self->dm_cancellable, load_dm_tweets_done, self);
660 }
661 
662 gboolean
load_dm_tweets(gpointer user_data)663 load_dm_tweets (gpointer user_data)
664 {
665   load_dm_tweets_with_cursor(user_data, NULL);
666   return TRUE;
667 }
668 
669 void
cb_user_stream_start(CbUserStream * self)670 cb_user_stream_start (CbUserStream *self)
671 {
672   g_debug("Loading timeline tweets on start");
673   load_timeline_tweets (self);
674   g_debug("Loading mention tweets on start");
675   load_mentions_tweets (self);
676   g_debug("Loading favourited tweets on start");
677   load_favourited_tweets (self);
678   g_debug("Loading DMs on start");
679   load_dm_tweets (self);
680 
681   if (!self->timeline_timeout) {
682     g_debug("Adding timeout for timeline");
683     self->timeline_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_timeline_tweets, self, NULL);
684   }
685   if (!self->mentions_timeout) {
686     g_debug("Adding timeout for mentions");
687     self->mentions_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_mentions_tweets, self, NULL);
688   }
689   if (!self->favourited_timeout) {
690     g_debug("Adding timeout for favourites");
691     self->favourited_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_favourited_tweets, self, NULL);
692   }
693   if (!self->dm_timeout) {
694     g_debug("Adding timeout for DMs");
695     self->dm_timeout = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 60 * 2, load_dm_tweets, self, NULL);
696   }
697 }
698 
cb_user_stream_stop(CbUserStream * self)699 void cb_user_stream_stop (CbUserStream *self)
700 {
701   if (self->timeline_timeout) {
702     g_source_remove (self->timeline_timeout);
703     self->timeline_timeout = 0;
704   }
705   if (self->mentions_timeout) {
706     g_source_remove (self->mentions_timeout);
707     self->mentions_timeout = 0;
708   }
709   if (self->favourited_timeout) {
710     g_source_remove (self->favourited_timeout);
711     self->favourited_timeout = 0;
712   }
713   if (self->dm_timeout) {
714     g_source_remove (self->dm_timeout);
715     self->dm_timeout = 0;
716   }
717 }
718 
719 void
cb_user_stream_set_proxy_data(CbUserStream * self,const char * token,const char * token_secret)720 cb_user_stream_set_proxy_data (CbUserStream *self,
721                                const char   *token,
722                                const char   *token_secret)
723 {
724 //  oauth_proxy_set_token (OAUTH_PROXY (self->proxy), token);
725 //  oauth_proxy_set_token_secret (OAUTH_PROXY (self->proxy), token_secret);
726 
727   self->proxy_data_set = TRUE;
728 }
729 
730 void
cb_user_stream_register(CbUserStream * self,CbMessageReceiver * receiver)731 cb_user_stream_register (CbUserStream      *self,
732                          CbMessageReceiver *receiver)
733 {
734   g_ptr_array_add (self->receivers, receiver);
735 }
736 
737 void
cb_user_stream_unregister(CbUserStream * self,CbMessageReceiver * receiver)738 cb_user_stream_unregister (CbUserStream      *self,
739                            CbMessageReceiver *receiver)
740 {
741   guint i;
742 
743   for (i = 0; i < self->receivers->len; i ++)
744     {
745       CbMessageReceiver *r = g_ptr_array_index (self->receivers, i);
746 
747       if (r == receiver)
748         {
749           g_ptr_array_remove_index_fast (self->receivers, i);
750           break;
751         }
752     }
753 }