1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/TopDialogManager.h"
8 
9 #include "td/telegram/AccessRights.h"
10 #include "td/telegram/AuthManager.h"
11 #include "td/telegram/ConfigShared.h"
12 #include "td/telegram/ContactsManager.h"
13 #include "td/telegram/DialogId.h"
14 #include "td/telegram/Global.h"
15 #include "td/telegram/logevent/LogEvent.h"
16 #include "td/telegram/MessagesManager.h"
17 #include "td/telegram/misc.h"
18 #include "td/telegram/net/NetQuery.h"
19 #include "td/telegram/net/NetQueryDispatcher.h"
20 #include "td/telegram/StateManager.h"
21 #include "td/telegram/Td.h"
22 #include "td/telegram/TdDb.h"
23 #include "td/telegram/TdParameters.h"
24 
25 #include "td/utils/algorithm.h"
26 #include "td/utils/buffer.h"
27 #include "td/utils/logging.h"
28 #include "td/utils/misc.h"
29 #include "td/utils/port/Clocks.h"
30 #include "td/utils/SliceBuilder.h"
31 #include "td/utils/Status.h"
32 #include "td/utils/tl_helpers.h"
33 
34 #include <algorithm>
35 #include <cmath>
36 #include <iterator>
37 
38 namespace td {
39 
40 class GetTopPeersQuery final : public Td::ResultHandler {
41   Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> promise_;
42 
43  public:
GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> && promise)44   explicit GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> &&promise)
45       : promise_(std::move(promise)) {
46   }
47 
send(int64 hash)48   void send(int64 hash) {
49     int32 flags =
50         telegram_api::contacts_getTopPeers::CORRESPONDENTS_MASK | telegram_api::contacts_getTopPeers::BOTS_PM_MASK |
51         telegram_api::contacts_getTopPeers::BOTS_INLINE_MASK | telegram_api::contacts_getTopPeers::GROUPS_MASK |
52         telegram_api::contacts_getTopPeers::CHANNELS_MASK | telegram_api::contacts_getTopPeers::PHONE_CALLS_MASK |
53         telegram_api::contacts_getTopPeers::FORWARD_USERS_MASK | telegram_api::contacts_getTopPeers::FORWARD_CHATS_MASK;
54     send_query(G()->net_query_creator().create(telegram_api::contacts_getTopPeers(
55         flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
56         false /*ignored*/, false /*ignored*/, false /*ignored*/, 0 /*offset*/, 100 /*limit*/, hash)));
57   }
58 
on_result(BufferSlice packet)59   void on_result(BufferSlice packet) final {
60     auto result_ptr = fetch_result<telegram_api::contacts_getTopPeers>(packet);
61     if (result_ptr.is_error()) {
62       return on_error(result_ptr.move_as_error());
63     }
64 
65     promise_.set_value(result_ptr.move_as_ok());
66   }
67 
on_error(Status status)68   void on_error(Status status) final {
69     promise_.set_error(std::move(status));
70   }
71 };
72 
73 class ToggleTopPeersQuery final : public Td::ResultHandler {
74   Promise<Unit> promise_;
75 
76  public:
ToggleTopPeersQuery(Promise<Unit> && promise)77   explicit ToggleTopPeersQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
78   }
79 
send(bool is_enabled)80   void send(bool is_enabled) {
81     send_query(G()->net_query_creator().create(telegram_api::contacts_toggleTopPeers(is_enabled)));
82   }
83 
on_result(BufferSlice packet)84   void on_result(BufferSlice packet) final {
85     auto result_ptr = fetch_result<telegram_api::contacts_toggleTopPeers>(packet);
86     if (result_ptr.is_error()) {
87       return on_error(result_ptr.move_as_error());
88     }
89 
90     promise_.set_value(Unit());
91   }
92 
on_error(Status status)93   void on_error(Status status) final {
94     promise_.set_error(std::move(status));
95   }
96 };
97 
98 class ResetTopPeerRatingQuery final : public Td::ResultHandler {
99   DialogId dialog_id_;
100 
101  public:
send(TopDialogCategory category,DialogId dialog_id)102   void send(TopDialogCategory category, DialogId dialog_id) {
103     auto input_peer = td_->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
104     if (input_peer == nullptr) {
105       return;
106     }
107 
108     dialog_id_ = dialog_id;
109     send_query(G()->net_query_creator().create(
110         telegram_api::contacts_resetTopPeerRating(get_input_top_peer_category(category), std::move(input_peer))));
111   }
112 
on_result(BufferSlice packet)113   void on_result(BufferSlice packet) final {
114     auto result_ptr = fetch_result<telegram_api::contacts_resetTopPeerRating>(packet);
115     if (result_ptr.is_error()) {
116       return on_error(result_ptr.move_as_error());
117     }
118 
119     // ignore the result
120   }
121 
on_error(Status status)122   void on_error(Status status) final {
123     if (!td_->messages_manager_->on_get_dialog_error(dialog_id_, status, "ResetTopPeerRatingQuery")) {
124       LOG(INFO) << "Receive error for ResetTopPeerRatingQuery: " << status;
125     }
126   }
127 };
128 
TopDialogManager(Td * td,ActorShared<> parent)129 TopDialogManager::TopDialogManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
130 }
131 
update_is_enabled(bool is_enabled)132 void TopDialogManager::update_is_enabled(bool is_enabled) {
133   if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized() || td_->auth_manager_->is_bot()) {
134     return;
135   }
136 
137   if (set_is_enabled(is_enabled)) {
138     G()->td_db()->get_binlog_pmc()->set("top_peers_enabled", is_enabled ? "1" : "0");
139     send_toggle_top_peers(is_enabled);
140 
141     loop();
142   }
143 }
144 
set_is_enabled(bool is_enabled)145 bool TopDialogManager::set_is_enabled(bool is_enabled) {
146   if (is_enabled_ == is_enabled) {
147     return false;
148   }
149 
150   LOG(DEBUG) << "Change top chats is_enabled to " << is_enabled;
151   is_enabled_ = is_enabled;
152   try_start();
153   return true;
154 }
155 
send_toggle_top_peers(bool is_enabled)156 void TopDialogManager::send_toggle_top_peers(bool is_enabled) {
157   if (G()->close_flag()) {
158     return;
159   }
160 
161   if (have_toggle_top_peers_query_) {
162     have_pending_toggle_top_peers_query_ = true;
163     pending_toggle_top_peers_query_ = is_enabled;
164     return;
165   }
166 
167   LOG(DEBUG) << "Send toggle top peers query to " << is_enabled;
168   have_toggle_top_peers_query_ = true;
169 
170   auto promise = PromiseCreator::lambda([actor_id = actor_id(this), is_enabled](Result<Unit> result) {
171     send_closure(actor_id, &TopDialogManager::on_toggle_top_peers, is_enabled, std::move(result));
172   });
173   td_->create_handler<ToggleTopPeersQuery>(std::move(promise))->send(is_enabled);
174 }
175 
on_toggle_top_peers(bool is_enabled,Result<Unit> && result)176 void TopDialogManager::on_toggle_top_peers(bool is_enabled, Result<Unit> &&result) {
177   CHECK(have_toggle_top_peers_query_);
178   have_toggle_top_peers_query_ = false;
179 
180   if (have_pending_toggle_top_peers_query_) {
181     have_pending_toggle_top_peers_query_ = false;
182     if (pending_toggle_top_peers_query_ != is_enabled) {
183       send_toggle_top_peers(pending_toggle_top_peers_query_);
184       return;
185     }
186   }
187 
188   if (result.is_ok()) {
189     // everything is synchronized
190     G()->td_db()->get_binlog_pmc()->erase("top_peers_enabled");
191   } else {
192     // let's resend the query forever
193     send_toggle_top_peers(is_enabled);
194   }
195   loop();
196 }
197 
on_dialog_used(TopDialogCategory category,DialogId dialog_id,int32 date)198 void TopDialogManager::on_dialog_used(TopDialogCategory category, DialogId dialog_id, int32 date) {
199   if (!is_active_ || !is_enabled_) {
200     return;
201   }
202   auto pos = static_cast<size_t>(category);
203   CHECK(pos < by_category_.size());
204   auto &top_dialogs = by_category_[pos];
205 
206   top_dialogs.is_dirty = true;
207   auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
208                          [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
209   if (it == top_dialogs.dialogs.end()) {
210     TopDialog top_dialog;
211     top_dialog.dialog_id = dialog_id;
212     top_dialogs.dialogs.push_back(top_dialog);
213     it = top_dialogs.dialogs.end() - 1;
214   }
215 
216   auto delta = rating_add(date, top_dialogs.rating_timestamp);
217   it->rating += delta;
218   while (it != top_dialogs.dialogs.begin()) {
219     auto next = std::prev(it);
220     if (*next < *it) {
221       break;
222     }
223     std::swap(*next, *it);
224     it = next;
225   }
226 
227   LOG(INFO) << "Update " << get_top_dialog_category_name(category) << " rating of " << dialog_id << " by " << delta;
228 
229   if (!first_unsync_change_) {
230     first_unsync_change_ = Timestamp::now_cached();
231   }
232   loop();
233 }
234 
remove_dialog(TopDialogCategory category,DialogId dialog_id,Promise<Unit> && promise)235 void TopDialogManager::remove_dialog(TopDialogCategory category, DialogId dialog_id, Promise<Unit> &&promise) {
236   if (category == TopDialogCategory::Size) {
237     return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
238   }
239   if (!td_->messages_manager_->have_dialog_force(dialog_id, "remove_dialog")) {
240     return promise.set_error(Status::Error(400, "Chat not found"));
241   }
242   if (!is_active_ || !is_enabled_) {
243     return promise.set_value(Unit());
244   }
245 
246   if (category == TopDialogCategory::ForwardUsers && dialog_id.get_type() != DialogType::User) {
247     category = TopDialogCategory::ForwardChats;
248   }
249 
250   auto pos = static_cast<size_t>(category);
251   CHECK(pos < by_category_.size());
252   auto &top_dialogs = by_category_[pos];
253 
254   td_->create_handler<ResetTopPeerRatingQuery>()->send(category, dialog_id);
255 
256   auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
257                          [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
258   if (it == top_dialogs.dialogs.end()) {
259     return promise.set_value(Unit());
260   }
261 
262   top_dialogs.is_dirty = true;
263   top_dialogs.dialogs.erase(it);
264   if (!first_unsync_change_) {
265     first_unsync_change_ = Timestamp::now_cached();
266   }
267   loop();
268   promise.set_value(Unit());
269 }
270 
get_top_dialogs(TopDialogCategory category,int32 limit,Promise<vector<DialogId>> promise)271 void TopDialogManager::get_top_dialogs(TopDialogCategory category, int32 limit, Promise<vector<DialogId>> promise) {
272   if (category == TopDialogCategory::Size) {
273     return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
274   }
275   if (limit <= 0) {
276     return promise.set_error(Status::Error(400, "Limit must be positive"));
277   }
278   if (!is_active_) {
279     return promise.set_error(Status::Error(400, "Not supported without chat info database"));
280   }
281   if (!is_enabled_) {
282     return promise.set_error(Status::Error(400, "Top chats computation is disabled"));
283   }
284 
285   GetTopDialogsQuery query;
286   query.category = category;
287   query.limit = static_cast<size_t>(limit);
288   query.promise = std::move(promise);
289   pending_get_top_dialogs_.push_back(std::move(query));
290   loop();
291 }
292 
update_rating_e_decay()293 void TopDialogManager::update_rating_e_decay() {
294   if (!is_active_) {
295     return;
296   }
297   rating_e_decay_ = narrow_cast<int32>(G()->shared_config().get_option_integer("rating_e_decay", rating_e_decay_));
298 }
299 
300 template <class StorerT>
store(const TopDialogManager::TopDialog & top_dialog,StorerT & storer)301 void store(const TopDialogManager::TopDialog &top_dialog, StorerT &storer) {
302   using ::td::store;
303   store(top_dialog.dialog_id, storer);
304   store(top_dialog.rating, storer);
305 }
306 
307 template <class ParserT>
parse(TopDialogManager::TopDialog & top_dialog,ParserT & parser)308 void parse(TopDialogManager::TopDialog &top_dialog, ParserT &parser) {
309   using ::td::parse;
310   parse(top_dialog.dialog_id, parser);
311   parse(top_dialog.rating, parser);
312 }
313 
314 template <class StorerT>
store(const TopDialogManager::TopDialogs & top_dialogs,StorerT & storer)315 void store(const TopDialogManager::TopDialogs &top_dialogs, StorerT &storer) {
316   using ::td::store;
317   store(top_dialogs.rating_timestamp, storer);
318   store(top_dialogs.dialogs, storer);
319 }
320 
321 template <class ParserT>
parse(TopDialogManager::TopDialogs & top_dialogs,ParserT & parser)322 void parse(TopDialogManager::TopDialogs &top_dialogs, ParserT &parser) {
323   using ::td::parse;
324   parse(top_dialogs.rating_timestamp, parser);
325   parse(top_dialogs.dialogs, parser);
326 }
327 
rating_add(double now,double rating_timestamp) const328 double TopDialogManager::rating_add(double now, double rating_timestamp) const {
329   return std::exp((now - rating_timestamp) / rating_e_decay_);
330 }
331 
current_rating_add(double rating_timestamp) const332 double TopDialogManager::current_rating_add(double rating_timestamp) const {
333   return rating_add(G()->server_time_cached(), rating_timestamp);
334 }
335 
normalize_rating()336 void TopDialogManager::normalize_rating() {
337   for (auto &top_dialogs : by_category_) {
338     auto div_by = current_rating_add(top_dialogs.rating_timestamp);
339     top_dialogs.rating_timestamp = G()->server_time_cached();
340     for (auto &dialog : top_dialogs.dialogs) {
341       dialog.rating /= div_by;
342     }
343     top_dialogs.is_dirty = true;
344   }
345   db_sync_state_ = SyncState::None;
346 }
347 
do_get_top_dialogs(GetTopDialogsQuery && query)348 void TopDialogManager::do_get_top_dialogs(GetTopDialogsQuery &&query) {
349   vector<DialogId> dialog_ids;
350   if (query.category != TopDialogCategory::ForwardUsers) {
351     auto pos = static_cast<size_t>(query.category);
352     CHECK(pos < by_category_.size());
353     dialog_ids = transform(by_category_[pos].dialogs, [](const auto &x) { return x.dialog_id; });
354   } else {
355     // merge ForwardUsers and ForwardChats
356     auto &users = by_category_[static_cast<size_t>(TopDialogCategory::ForwardUsers)];
357     auto &chats = by_category_[static_cast<size_t>(TopDialogCategory::ForwardChats)];
358     size_t users_pos = 0;
359     size_t chats_pos = 0;
360     while (users_pos < users.dialogs.size() || chats_pos < chats.dialogs.size()) {
361       if (chats_pos == chats.dialogs.size() ||
362           (users_pos < users.dialogs.size() && users.dialogs[users_pos] < chats.dialogs[chats_pos])) {
363         dialog_ids.push_back(users.dialogs[users_pos++].dialog_id);
364       } else {
365         dialog_ids.push_back(chats.dialogs[chats_pos++].dialog_id);
366       }
367     }
368   }
369 
370   auto promise = PromiseCreator::lambda(
371       [actor_id = actor_id(this), query = std::move(query)](Result<vector<DialogId>> r_dialog_ids) mutable {
372         if (r_dialog_ids.is_error()) {
373           return query.promise.set_error(r_dialog_ids.move_as_error());
374         }
375         send_closure(actor_id, &TopDialogManager::on_load_dialogs, std::move(query), r_dialog_ids.move_as_ok());
376       });
377   td_->messages_manager_->load_dialogs(std::move(dialog_ids), std::move(promise));
378 }
379 
on_load_dialogs(GetTopDialogsQuery && query,vector<DialogId> && dialog_ids)380 void TopDialogManager::on_load_dialogs(GetTopDialogsQuery &&query, vector<DialogId> &&dialog_ids) {
381   auto limit = std::min({query.limit, MAX_TOP_DIALOGS_LIMIT, dialog_ids.size()});
382   vector<DialogId> result;
383   result.reserve(limit);
384   for (auto dialog_id : dialog_ids) {
385     if (dialog_id.get_type() == DialogType::User) {
386       auto user_id = dialog_id.get_user_id();
387       if (td_->contacts_manager_->is_user_deleted(user_id)) {
388         LOG(INFO) << "Skip deleted " << user_id;
389         continue;
390       }
391       if (td_->contacts_manager_->get_my_id() == user_id) {
392         LOG(INFO) << "Skip self " << user_id;
393         continue;
394       }
395       if (query.category == TopDialogCategory::BotInline || query.category == TopDialogCategory::BotPM) {
396         auto r_bot_info = td_->contacts_manager_->get_bot_data(user_id);
397         if (r_bot_info.is_error()) {
398           LOG(INFO) << "Skip not a bot " << user_id;
399           continue;
400         }
401         if (query.category == TopDialogCategory::BotInline &&
402             (r_bot_info.ok().username.empty() || !r_bot_info.ok().is_inline)) {
403           LOG(INFO) << "Skip not inline bot " << user_id;
404           continue;
405         }
406       }
407     }
408 
409     result.push_back(dialog_id);
410     if (result.size() == limit) {
411       break;
412     }
413   }
414 
415   query.promise.set_value(std::move(result));
416 }
417 
do_get_top_peers()418 void TopDialogManager::do_get_top_peers() {
419   std::vector<uint64> ids;
420   for (auto &category : by_category_) {
421     for (auto &top_dialog : category.dialogs) {
422       auto dialog_id = top_dialog.dialog_id;
423       switch (dialog_id.get_type()) {
424         case DialogType::User:
425           ids.push_back(dialog_id.get_user_id().get());
426           break;
427         case DialogType::Chat:
428           ids.push_back(dialog_id.get_chat_id().get());
429           break;
430         case DialogType::Channel:
431           ids.push_back(dialog_id.get_channel_id().get());
432           break;
433         default:
434           break;
435       }
436     }
437   }
438   auto promise = PromiseCreator::lambda(
439       [actor_id = actor_id(this)](Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
440         send_closure(actor_id, &TopDialogManager::on_get_top_peers, std::move(result));
441       });
442   td_->create_handler<GetTopPeersQuery>(std::move(promise))->send(get_vector_hash(ids));
443 }
444 
on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result)445 void TopDialogManager::on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
446   normalize_rating();  // once a day too
447 
448   if (result.is_error()) {
449     last_server_sync_ = Timestamp::in(SERVER_SYNC_RESEND_DELAY - SERVER_SYNC_DELAY);
450     loop();
451     return;
452   }
453 
454   last_server_sync_ = Timestamp::now();
455   server_sync_state_ = SyncState::Ok;
456 
457   auto top_peers_parent = result.move_as_ok();
458   LOG(DEBUG) << "Receive contacts_getTopPeers result: " << to_string(top_peers_parent);
459   switch (top_peers_parent->get_id()) {
460     case telegram_api::contacts_topPeersNotModified::ID:
461       // nothing to do
462       break;
463     case telegram_api::contacts_topPeersDisabled::ID:
464       G()->shared_config().set_option_boolean("disable_top_chats", true);
465       set_is_enabled(false);  // apply immediately
466       break;
467     case telegram_api::contacts_topPeers::ID: {
468       G()->shared_config().set_option_empty("disable_top_chats");
469       set_is_enabled(true);  // apply immediately
470       auto top_peers = move_tl_object_as<telegram_api::contacts_topPeers>(std::move(top_peers_parent));
471 
472       td_->contacts_manager_->on_get_users(std::move(top_peers->users_), "on get top chats");
473       td_->contacts_manager_->on_get_chats(std::move(top_peers->chats_), "on get top chats");
474       for (auto &category : top_peers->categories_) {
475         auto dialog_category = get_top_dialog_category(category->category_);
476         auto pos = static_cast<size_t>(dialog_category);
477         CHECK(pos < by_category_.size());
478         auto &top_dialogs = by_category_[pos];
479 
480         top_dialogs.is_dirty = true;
481         top_dialogs.dialogs.clear();
482         for (auto &top_peer : category->peers_) {
483           TopDialog top_dialog;
484           top_dialog.dialog_id = DialogId(top_peer->peer_);
485           top_dialog.rating = top_peer->rating_;
486           top_dialogs.dialogs.push_back(std::move(top_dialog));
487         }
488       }
489       db_sync_state_ = SyncState::None;
490       break;
491     }
492     default:
493       UNREACHABLE();
494   }
495 
496   G()->td_db()->get_binlog_pmc()->set("top_dialogs_ts", to_string(static_cast<uint32>(Clocks::system())));
497   loop();
498 }
499 
do_save_top_dialogs()500 void TopDialogManager::do_save_top_dialogs() {
501   LOG(INFO) << "Save top chats";
502   for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
503     auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
504     auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
505 
506     auto &top_dialogs = by_category_[top_dialog_category_i];
507     if (!top_dialogs.is_dirty) {
508       continue;
509     }
510     top_dialogs.is_dirty = false;
511 
512     G()->td_db()->get_binlog_pmc()->set(key, log_event_store(top_dialogs).as_slice().str());
513   }
514   db_sync_state_ = SyncState::Ok;
515   first_unsync_change_ = Timestamp();
516 }
517 
start_up()518 void TopDialogManager::start_up() {
519   init();
520 }
521 
tear_down()522 void TopDialogManager::tear_down() {
523   parent_.reset();
524 }
525 
init()526 void TopDialogManager::init() {
527   if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized()) {
528     return;
529   }
530 
531   is_active_ = G()->parameters().use_chat_info_db && !td_->auth_manager_->is_bot();
532   is_enabled_ = !G()->shared_config().get_option_boolean("disable_top_chats");
533   update_rating_e_decay();
534 
535   string need_update_top_peers = G()->td_db()->get_binlog_pmc()->get("top_peers_enabled");
536   if (!need_update_top_peers.empty()) {
537     send_toggle_top_peers(need_update_top_peers[0] == '1');
538   }
539 
540   try_start();
541   loop();
542 }
543 
try_start()544 void TopDialogManager::try_start() {
545   was_first_sync_ = false;
546   first_unsync_change_ = Timestamp();
547   server_sync_state_ = SyncState::None;
548   last_server_sync_ = Timestamp();
549   CHECK(pending_get_top_dialogs_.empty());
550 
551   LOG(DEBUG) << "Init is enabled: " << is_enabled_;
552   if (!is_active_) {
553     G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs");
554     return;
555   }
556 
557   auto di_top_dialogs_ts = G()->td_db()->get_binlog_pmc()->get("top_dialogs_ts");
558   if (!di_top_dialogs_ts.empty()) {
559     last_server_sync_ = Timestamp::in(to_integer<uint32>(di_top_dialogs_ts) - Clocks::system());
560     if (last_server_sync_.is_in_past()) {
561       server_sync_state_ = SyncState::Ok;
562     }
563   }
564 
565   if (is_enabled_) {
566     for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
567       auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
568       auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
569       auto value = G()->td_db()->get_binlog_pmc()->get(key);
570 
571       auto &top_dialogs = by_category_[top_dialog_category_i];
572       top_dialogs.is_dirty = false;
573       if (value.empty()) {
574         continue;
575       }
576       log_event_parse(top_dialogs, value).ensure();
577     }
578     normalize_rating();
579   } else {
580     G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs#");
581     for (auto &top_dialogs : by_category_) {
582       top_dialogs.is_dirty = false;
583       top_dialogs.rating_timestamp = 0;
584       top_dialogs.dialogs.clear();
585     }
586   }
587   db_sync_state_ = SyncState::Ok;
588 
589   send_closure(G()->state_manager(), &StateManager::wait_first_sync,
590                PromiseCreator::event(self_closure(this, &TopDialogManager::on_first_sync)));
591 }
592 
on_first_sync()593 void TopDialogManager::on_first_sync() {
594   was_first_sync_ = true;
595   if (!G()->close_flag() && td_->auth_manager_->is_bot()) {
596     is_active_ = false;
597     try_start();
598   }
599   loop();
600 }
601 
loop()602 void TopDialogManager::loop() {
603   if (!is_active_ || G()->close_flag()) {
604     return;
605   }
606 
607   if (!pending_get_top_dialogs_.empty()) {
608     for (auto &query : pending_get_top_dialogs_) {
609       do_get_top_dialogs(std::move(query));
610     }
611     pending_get_top_dialogs_.clear();
612   }
613 
614   // server sync
615   Timestamp server_sync_timeout;
616   if (server_sync_state_ == SyncState::Ok) {
617     server_sync_timeout = Timestamp::at(last_server_sync_.at() + SERVER_SYNC_DELAY);
618     if (server_sync_timeout.is_in_past()) {
619       server_sync_state_ = SyncState::None;
620     }
621   }
622 
623   Timestamp wakeup_timeout;
624   if (server_sync_state_ == SyncState::Ok) {
625     wakeup_timeout.relax(server_sync_timeout);
626   } else if (server_sync_state_ == SyncState::None && was_first_sync_) {
627     server_sync_state_ = SyncState::Pending;
628     do_get_top_peers();
629   }
630 
631   if (is_enabled_) {
632     // database sync
633     Timestamp db_sync_timeout;
634     if (db_sync_state_ == SyncState::Ok) {
635       if (first_unsync_change_) {
636         db_sync_timeout = Timestamp::at(first_unsync_change_.at() + DB_SYNC_DELAY);
637         if (db_sync_timeout.is_in_past()) {
638           db_sync_state_ = SyncState::None;
639         }
640       }
641     }
642 
643     if (db_sync_state_ == SyncState::Ok) {
644       wakeup_timeout.relax(db_sync_timeout);
645     } else if (db_sync_state_ == SyncState::None) {
646       if (server_sync_state_ == SyncState::Ok) {
647         do_save_top_dialogs();
648       }
649     }
650   }
651 
652   if (wakeup_timeout) {
653     LOG(INFO) << "Wakeup in: " << wakeup_timeout.in();
654     set_timeout_at(wakeup_timeout.at());
655   } else {
656     LOG(INFO) << "Wakeup: never";
657     cancel_timeout();
658   }
659 }
660 
661 }  // namespace td
662