1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/TopDialogManager.h"
8
9 #include "td/telegram/AccessRights.h"
10 #include "td/telegram/AuthManager.h"
11 #include "td/telegram/ConfigShared.h"
12 #include "td/telegram/ContactsManager.h"
13 #include "td/telegram/DialogId.h"
14 #include "td/telegram/Global.h"
15 #include "td/telegram/logevent/LogEvent.h"
16 #include "td/telegram/MessagesManager.h"
17 #include "td/telegram/misc.h"
18 #include "td/telegram/net/NetQuery.h"
19 #include "td/telegram/net/NetQueryDispatcher.h"
20 #include "td/telegram/StateManager.h"
21 #include "td/telegram/Td.h"
22 #include "td/telegram/TdDb.h"
23 #include "td/telegram/TdParameters.h"
24
25 #include "td/utils/algorithm.h"
26 #include "td/utils/buffer.h"
27 #include "td/utils/logging.h"
28 #include "td/utils/misc.h"
29 #include "td/utils/port/Clocks.h"
30 #include "td/utils/SliceBuilder.h"
31 #include "td/utils/Status.h"
32 #include "td/utils/tl_helpers.h"
33
34 #include <algorithm>
35 #include <cmath>
36 #include <iterator>
37
38 namespace td {
39
40 class GetTopPeersQuery final : public Td::ResultHandler {
41 Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> promise_;
42
43 public:
GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> && promise)44 explicit GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> &&promise)
45 : promise_(std::move(promise)) {
46 }
47
send(int64 hash)48 void send(int64 hash) {
49 int32 flags =
50 telegram_api::contacts_getTopPeers::CORRESPONDENTS_MASK | telegram_api::contacts_getTopPeers::BOTS_PM_MASK |
51 telegram_api::contacts_getTopPeers::BOTS_INLINE_MASK | telegram_api::contacts_getTopPeers::GROUPS_MASK |
52 telegram_api::contacts_getTopPeers::CHANNELS_MASK | telegram_api::contacts_getTopPeers::PHONE_CALLS_MASK |
53 telegram_api::contacts_getTopPeers::FORWARD_USERS_MASK | telegram_api::contacts_getTopPeers::FORWARD_CHATS_MASK;
54 send_query(G()->net_query_creator().create(telegram_api::contacts_getTopPeers(
55 flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
56 false /*ignored*/, false /*ignored*/, false /*ignored*/, 0 /*offset*/, 100 /*limit*/, hash)));
57 }
58
on_result(BufferSlice packet)59 void on_result(BufferSlice packet) final {
60 auto result_ptr = fetch_result<telegram_api::contacts_getTopPeers>(packet);
61 if (result_ptr.is_error()) {
62 return on_error(result_ptr.move_as_error());
63 }
64
65 promise_.set_value(result_ptr.move_as_ok());
66 }
67
on_error(Status status)68 void on_error(Status status) final {
69 promise_.set_error(std::move(status));
70 }
71 };
72
73 class ToggleTopPeersQuery final : public Td::ResultHandler {
74 Promise<Unit> promise_;
75
76 public:
ToggleTopPeersQuery(Promise<Unit> && promise)77 explicit ToggleTopPeersQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
78 }
79
send(bool is_enabled)80 void send(bool is_enabled) {
81 send_query(G()->net_query_creator().create(telegram_api::contacts_toggleTopPeers(is_enabled)));
82 }
83
on_result(BufferSlice packet)84 void on_result(BufferSlice packet) final {
85 auto result_ptr = fetch_result<telegram_api::contacts_toggleTopPeers>(packet);
86 if (result_ptr.is_error()) {
87 return on_error(result_ptr.move_as_error());
88 }
89
90 promise_.set_value(Unit());
91 }
92
on_error(Status status)93 void on_error(Status status) final {
94 promise_.set_error(std::move(status));
95 }
96 };
97
98 class ResetTopPeerRatingQuery final : public Td::ResultHandler {
99 DialogId dialog_id_;
100
101 public:
send(TopDialogCategory category,DialogId dialog_id)102 void send(TopDialogCategory category, DialogId dialog_id) {
103 auto input_peer = td_->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
104 if (input_peer == nullptr) {
105 return;
106 }
107
108 dialog_id_ = dialog_id;
109 send_query(G()->net_query_creator().create(
110 telegram_api::contacts_resetTopPeerRating(get_input_top_peer_category(category), std::move(input_peer))));
111 }
112
on_result(BufferSlice packet)113 void on_result(BufferSlice packet) final {
114 auto result_ptr = fetch_result<telegram_api::contacts_resetTopPeerRating>(packet);
115 if (result_ptr.is_error()) {
116 return on_error(result_ptr.move_as_error());
117 }
118
119 // ignore the result
120 }
121
on_error(Status status)122 void on_error(Status status) final {
123 if (!td_->messages_manager_->on_get_dialog_error(dialog_id_, status, "ResetTopPeerRatingQuery")) {
124 LOG(INFO) << "Receive error for ResetTopPeerRatingQuery: " << status;
125 }
126 }
127 };
128
TopDialogManager(Td * td,ActorShared<> parent)129 TopDialogManager::TopDialogManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
130 }
131
update_is_enabled(bool is_enabled)132 void TopDialogManager::update_is_enabled(bool is_enabled) {
133 if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized() || td_->auth_manager_->is_bot()) {
134 return;
135 }
136
137 if (set_is_enabled(is_enabled)) {
138 G()->td_db()->get_binlog_pmc()->set("top_peers_enabled", is_enabled ? "1" : "0");
139 send_toggle_top_peers(is_enabled);
140
141 loop();
142 }
143 }
144
set_is_enabled(bool is_enabled)145 bool TopDialogManager::set_is_enabled(bool is_enabled) {
146 if (is_enabled_ == is_enabled) {
147 return false;
148 }
149
150 LOG(DEBUG) << "Change top chats is_enabled to " << is_enabled;
151 is_enabled_ = is_enabled;
152 try_start();
153 return true;
154 }
155
send_toggle_top_peers(bool is_enabled)156 void TopDialogManager::send_toggle_top_peers(bool is_enabled) {
157 if (G()->close_flag()) {
158 return;
159 }
160
161 if (have_toggle_top_peers_query_) {
162 have_pending_toggle_top_peers_query_ = true;
163 pending_toggle_top_peers_query_ = is_enabled;
164 return;
165 }
166
167 LOG(DEBUG) << "Send toggle top peers query to " << is_enabled;
168 have_toggle_top_peers_query_ = true;
169
170 auto promise = PromiseCreator::lambda([actor_id = actor_id(this), is_enabled](Result<Unit> result) {
171 send_closure(actor_id, &TopDialogManager::on_toggle_top_peers, is_enabled, std::move(result));
172 });
173 td_->create_handler<ToggleTopPeersQuery>(std::move(promise))->send(is_enabled);
174 }
175
on_toggle_top_peers(bool is_enabled,Result<Unit> && result)176 void TopDialogManager::on_toggle_top_peers(bool is_enabled, Result<Unit> &&result) {
177 CHECK(have_toggle_top_peers_query_);
178 have_toggle_top_peers_query_ = false;
179
180 if (have_pending_toggle_top_peers_query_) {
181 have_pending_toggle_top_peers_query_ = false;
182 if (pending_toggle_top_peers_query_ != is_enabled) {
183 send_toggle_top_peers(pending_toggle_top_peers_query_);
184 return;
185 }
186 }
187
188 if (result.is_ok()) {
189 // everything is synchronized
190 G()->td_db()->get_binlog_pmc()->erase("top_peers_enabled");
191 } else {
192 // let's resend the query forever
193 send_toggle_top_peers(is_enabled);
194 }
195 loop();
196 }
197
on_dialog_used(TopDialogCategory category,DialogId dialog_id,int32 date)198 void TopDialogManager::on_dialog_used(TopDialogCategory category, DialogId dialog_id, int32 date) {
199 if (!is_active_ || !is_enabled_) {
200 return;
201 }
202 auto pos = static_cast<size_t>(category);
203 CHECK(pos < by_category_.size());
204 auto &top_dialogs = by_category_[pos];
205
206 top_dialogs.is_dirty = true;
207 auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
208 [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
209 if (it == top_dialogs.dialogs.end()) {
210 TopDialog top_dialog;
211 top_dialog.dialog_id = dialog_id;
212 top_dialogs.dialogs.push_back(top_dialog);
213 it = top_dialogs.dialogs.end() - 1;
214 }
215
216 auto delta = rating_add(date, top_dialogs.rating_timestamp);
217 it->rating += delta;
218 while (it != top_dialogs.dialogs.begin()) {
219 auto next = std::prev(it);
220 if (*next < *it) {
221 break;
222 }
223 std::swap(*next, *it);
224 it = next;
225 }
226
227 LOG(INFO) << "Update " << get_top_dialog_category_name(category) << " rating of " << dialog_id << " by " << delta;
228
229 if (!first_unsync_change_) {
230 first_unsync_change_ = Timestamp::now_cached();
231 }
232 loop();
233 }
234
remove_dialog(TopDialogCategory category,DialogId dialog_id,Promise<Unit> && promise)235 void TopDialogManager::remove_dialog(TopDialogCategory category, DialogId dialog_id, Promise<Unit> &&promise) {
236 if (category == TopDialogCategory::Size) {
237 return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
238 }
239 if (!td_->messages_manager_->have_dialog_force(dialog_id, "remove_dialog")) {
240 return promise.set_error(Status::Error(400, "Chat not found"));
241 }
242 if (!is_active_ || !is_enabled_) {
243 return promise.set_value(Unit());
244 }
245
246 if (category == TopDialogCategory::ForwardUsers && dialog_id.get_type() != DialogType::User) {
247 category = TopDialogCategory::ForwardChats;
248 }
249
250 auto pos = static_cast<size_t>(category);
251 CHECK(pos < by_category_.size());
252 auto &top_dialogs = by_category_[pos];
253
254 td_->create_handler<ResetTopPeerRatingQuery>()->send(category, dialog_id);
255
256 auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
257 [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
258 if (it == top_dialogs.dialogs.end()) {
259 return promise.set_value(Unit());
260 }
261
262 top_dialogs.is_dirty = true;
263 top_dialogs.dialogs.erase(it);
264 if (!first_unsync_change_) {
265 first_unsync_change_ = Timestamp::now_cached();
266 }
267 loop();
268 promise.set_value(Unit());
269 }
270
get_top_dialogs(TopDialogCategory category,int32 limit,Promise<vector<DialogId>> promise)271 void TopDialogManager::get_top_dialogs(TopDialogCategory category, int32 limit, Promise<vector<DialogId>> promise) {
272 if (category == TopDialogCategory::Size) {
273 return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
274 }
275 if (limit <= 0) {
276 return promise.set_error(Status::Error(400, "Limit must be positive"));
277 }
278 if (!is_active_) {
279 return promise.set_error(Status::Error(400, "Not supported without chat info database"));
280 }
281 if (!is_enabled_) {
282 return promise.set_error(Status::Error(400, "Top chats computation is disabled"));
283 }
284
285 GetTopDialogsQuery query;
286 query.category = category;
287 query.limit = static_cast<size_t>(limit);
288 query.promise = std::move(promise);
289 pending_get_top_dialogs_.push_back(std::move(query));
290 loop();
291 }
292
update_rating_e_decay()293 void TopDialogManager::update_rating_e_decay() {
294 if (!is_active_) {
295 return;
296 }
297 rating_e_decay_ = narrow_cast<int32>(G()->shared_config().get_option_integer("rating_e_decay", rating_e_decay_));
298 }
299
300 template <class StorerT>
store(const TopDialogManager::TopDialog & top_dialog,StorerT & storer)301 void store(const TopDialogManager::TopDialog &top_dialog, StorerT &storer) {
302 using ::td::store;
303 store(top_dialog.dialog_id, storer);
304 store(top_dialog.rating, storer);
305 }
306
307 template <class ParserT>
parse(TopDialogManager::TopDialog & top_dialog,ParserT & parser)308 void parse(TopDialogManager::TopDialog &top_dialog, ParserT &parser) {
309 using ::td::parse;
310 parse(top_dialog.dialog_id, parser);
311 parse(top_dialog.rating, parser);
312 }
313
314 template <class StorerT>
store(const TopDialogManager::TopDialogs & top_dialogs,StorerT & storer)315 void store(const TopDialogManager::TopDialogs &top_dialogs, StorerT &storer) {
316 using ::td::store;
317 store(top_dialogs.rating_timestamp, storer);
318 store(top_dialogs.dialogs, storer);
319 }
320
321 template <class ParserT>
parse(TopDialogManager::TopDialogs & top_dialogs,ParserT & parser)322 void parse(TopDialogManager::TopDialogs &top_dialogs, ParserT &parser) {
323 using ::td::parse;
324 parse(top_dialogs.rating_timestamp, parser);
325 parse(top_dialogs.dialogs, parser);
326 }
327
rating_add(double now,double rating_timestamp) const328 double TopDialogManager::rating_add(double now, double rating_timestamp) const {
329 return std::exp((now - rating_timestamp) / rating_e_decay_);
330 }
331
current_rating_add(double rating_timestamp) const332 double TopDialogManager::current_rating_add(double rating_timestamp) const {
333 return rating_add(G()->server_time_cached(), rating_timestamp);
334 }
335
normalize_rating()336 void TopDialogManager::normalize_rating() {
337 for (auto &top_dialogs : by_category_) {
338 auto div_by = current_rating_add(top_dialogs.rating_timestamp);
339 top_dialogs.rating_timestamp = G()->server_time_cached();
340 for (auto &dialog : top_dialogs.dialogs) {
341 dialog.rating /= div_by;
342 }
343 top_dialogs.is_dirty = true;
344 }
345 db_sync_state_ = SyncState::None;
346 }
347
do_get_top_dialogs(GetTopDialogsQuery && query)348 void TopDialogManager::do_get_top_dialogs(GetTopDialogsQuery &&query) {
349 vector<DialogId> dialog_ids;
350 if (query.category != TopDialogCategory::ForwardUsers) {
351 auto pos = static_cast<size_t>(query.category);
352 CHECK(pos < by_category_.size());
353 dialog_ids = transform(by_category_[pos].dialogs, [](const auto &x) { return x.dialog_id; });
354 } else {
355 // merge ForwardUsers and ForwardChats
356 auto &users = by_category_[static_cast<size_t>(TopDialogCategory::ForwardUsers)];
357 auto &chats = by_category_[static_cast<size_t>(TopDialogCategory::ForwardChats)];
358 size_t users_pos = 0;
359 size_t chats_pos = 0;
360 while (users_pos < users.dialogs.size() || chats_pos < chats.dialogs.size()) {
361 if (chats_pos == chats.dialogs.size() ||
362 (users_pos < users.dialogs.size() && users.dialogs[users_pos] < chats.dialogs[chats_pos])) {
363 dialog_ids.push_back(users.dialogs[users_pos++].dialog_id);
364 } else {
365 dialog_ids.push_back(chats.dialogs[chats_pos++].dialog_id);
366 }
367 }
368 }
369
370 auto promise = PromiseCreator::lambda(
371 [actor_id = actor_id(this), query = std::move(query)](Result<vector<DialogId>> r_dialog_ids) mutable {
372 if (r_dialog_ids.is_error()) {
373 return query.promise.set_error(r_dialog_ids.move_as_error());
374 }
375 send_closure(actor_id, &TopDialogManager::on_load_dialogs, std::move(query), r_dialog_ids.move_as_ok());
376 });
377 td_->messages_manager_->load_dialogs(std::move(dialog_ids), std::move(promise));
378 }
379
on_load_dialogs(GetTopDialogsQuery && query,vector<DialogId> && dialog_ids)380 void TopDialogManager::on_load_dialogs(GetTopDialogsQuery &&query, vector<DialogId> &&dialog_ids) {
381 auto limit = std::min({query.limit, MAX_TOP_DIALOGS_LIMIT, dialog_ids.size()});
382 vector<DialogId> result;
383 result.reserve(limit);
384 for (auto dialog_id : dialog_ids) {
385 if (dialog_id.get_type() == DialogType::User) {
386 auto user_id = dialog_id.get_user_id();
387 if (td_->contacts_manager_->is_user_deleted(user_id)) {
388 LOG(INFO) << "Skip deleted " << user_id;
389 continue;
390 }
391 if (td_->contacts_manager_->get_my_id() == user_id) {
392 LOG(INFO) << "Skip self " << user_id;
393 continue;
394 }
395 if (query.category == TopDialogCategory::BotInline || query.category == TopDialogCategory::BotPM) {
396 auto r_bot_info = td_->contacts_manager_->get_bot_data(user_id);
397 if (r_bot_info.is_error()) {
398 LOG(INFO) << "Skip not a bot " << user_id;
399 continue;
400 }
401 if (query.category == TopDialogCategory::BotInline &&
402 (r_bot_info.ok().username.empty() || !r_bot_info.ok().is_inline)) {
403 LOG(INFO) << "Skip not inline bot " << user_id;
404 continue;
405 }
406 }
407 }
408
409 result.push_back(dialog_id);
410 if (result.size() == limit) {
411 break;
412 }
413 }
414
415 query.promise.set_value(std::move(result));
416 }
417
do_get_top_peers()418 void TopDialogManager::do_get_top_peers() {
419 std::vector<uint64> ids;
420 for (auto &category : by_category_) {
421 for (auto &top_dialog : category.dialogs) {
422 auto dialog_id = top_dialog.dialog_id;
423 switch (dialog_id.get_type()) {
424 case DialogType::User:
425 ids.push_back(dialog_id.get_user_id().get());
426 break;
427 case DialogType::Chat:
428 ids.push_back(dialog_id.get_chat_id().get());
429 break;
430 case DialogType::Channel:
431 ids.push_back(dialog_id.get_channel_id().get());
432 break;
433 default:
434 break;
435 }
436 }
437 }
438 auto promise = PromiseCreator::lambda(
439 [actor_id = actor_id(this)](Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
440 send_closure(actor_id, &TopDialogManager::on_get_top_peers, std::move(result));
441 });
442 td_->create_handler<GetTopPeersQuery>(std::move(promise))->send(get_vector_hash(ids));
443 }
444
on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result)445 void TopDialogManager::on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
446 normalize_rating(); // once a day too
447
448 if (result.is_error()) {
449 last_server_sync_ = Timestamp::in(SERVER_SYNC_RESEND_DELAY - SERVER_SYNC_DELAY);
450 loop();
451 return;
452 }
453
454 last_server_sync_ = Timestamp::now();
455 server_sync_state_ = SyncState::Ok;
456
457 auto top_peers_parent = result.move_as_ok();
458 LOG(DEBUG) << "Receive contacts_getTopPeers result: " << to_string(top_peers_parent);
459 switch (top_peers_parent->get_id()) {
460 case telegram_api::contacts_topPeersNotModified::ID:
461 // nothing to do
462 break;
463 case telegram_api::contacts_topPeersDisabled::ID:
464 G()->shared_config().set_option_boolean("disable_top_chats", true);
465 set_is_enabled(false); // apply immediately
466 break;
467 case telegram_api::contacts_topPeers::ID: {
468 G()->shared_config().set_option_empty("disable_top_chats");
469 set_is_enabled(true); // apply immediately
470 auto top_peers = move_tl_object_as<telegram_api::contacts_topPeers>(std::move(top_peers_parent));
471
472 td_->contacts_manager_->on_get_users(std::move(top_peers->users_), "on get top chats");
473 td_->contacts_manager_->on_get_chats(std::move(top_peers->chats_), "on get top chats");
474 for (auto &category : top_peers->categories_) {
475 auto dialog_category = get_top_dialog_category(category->category_);
476 auto pos = static_cast<size_t>(dialog_category);
477 CHECK(pos < by_category_.size());
478 auto &top_dialogs = by_category_[pos];
479
480 top_dialogs.is_dirty = true;
481 top_dialogs.dialogs.clear();
482 for (auto &top_peer : category->peers_) {
483 TopDialog top_dialog;
484 top_dialog.dialog_id = DialogId(top_peer->peer_);
485 top_dialog.rating = top_peer->rating_;
486 top_dialogs.dialogs.push_back(std::move(top_dialog));
487 }
488 }
489 db_sync_state_ = SyncState::None;
490 break;
491 }
492 default:
493 UNREACHABLE();
494 }
495
496 G()->td_db()->get_binlog_pmc()->set("top_dialogs_ts", to_string(static_cast<uint32>(Clocks::system())));
497 loop();
498 }
499
do_save_top_dialogs()500 void TopDialogManager::do_save_top_dialogs() {
501 LOG(INFO) << "Save top chats";
502 for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
503 auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
504 auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
505
506 auto &top_dialogs = by_category_[top_dialog_category_i];
507 if (!top_dialogs.is_dirty) {
508 continue;
509 }
510 top_dialogs.is_dirty = false;
511
512 G()->td_db()->get_binlog_pmc()->set(key, log_event_store(top_dialogs).as_slice().str());
513 }
514 db_sync_state_ = SyncState::Ok;
515 first_unsync_change_ = Timestamp();
516 }
517
start_up()518 void TopDialogManager::start_up() {
519 init();
520 }
521
tear_down()522 void TopDialogManager::tear_down() {
523 parent_.reset();
524 }
525
init()526 void TopDialogManager::init() {
527 if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized()) {
528 return;
529 }
530
531 is_active_ = G()->parameters().use_chat_info_db && !td_->auth_manager_->is_bot();
532 is_enabled_ = !G()->shared_config().get_option_boolean("disable_top_chats");
533 update_rating_e_decay();
534
535 string need_update_top_peers = G()->td_db()->get_binlog_pmc()->get("top_peers_enabled");
536 if (!need_update_top_peers.empty()) {
537 send_toggle_top_peers(need_update_top_peers[0] == '1');
538 }
539
540 try_start();
541 loop();
542 }
543
try_start()544 void TopDialogManager::try_start() {
545 was_first_sync_ = false;
546 first_unsync_change_ = Timestamp();
547 server_sync_state_ = SyncState::None;
548 last_server_sync_ = Timestamp();
549 CHECK(pending_get_top_dialogs_.empty());
550
551 LOG(DEBUG) << "Init is enabled: " << is_enabled_;
552 if (!is_active_) {
553 G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs");
554 return;
555 }
556
557 auto di_top_dialogs_ts = G()->td_db()->get_binlog_pmc()->get("top_dialogs_ts");
558 if (!di_top_dialogs_ts.empty()) {
559 last_server_sync_ = Timestamp::in(to_integer<uint32>(di_top_dialogs_ts) - Clocks::system());
560 if (last_server_sync_.is_in_past()) {
561 server_sync_state_ = SyncState::Ok;
562 }
563 }
564
565 if (is_enabled_) {
566 for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
567 auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
568 auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
569 auto value = G()->td_db()->get_binlog_pmc()->get(key);
570
571 auto &top_dialogs = by_category_[top_dialog_category_i];
572 top_dialogs.is_dirty = false;
573 if (value.empty()) {
574 continue;
575 }
576 log_event_parse(top_dialogs, value).ensure();
577 }
578 normalize_rating();
579 } else {
580 G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs#");
581 for (auto &top_dialogs : by_category_) {
582 top_dialogs.is_dirty = false;
583 top_dialogs.rating_timestamp = 0;
584 top_dialogs.dialogs.clear();
585 }
586 }
587 db_sync_state_ = SyncState::Ok;
588
589 send_closure(G()->state_manager(), &StateManager::wait_first_sync,
590 PromiseCreator::event(self_closure(this, &TopDialogManager::on_first_sync)));
591 }
592
on_first_sync()593 void TopDialogManager::on_first_sync() {
594 was_first_sync_ = true;
595 if (!G()->close_flag() && td_->auth_manager_->is_bot()) {
596 is_active_ = false;
597 try_start();
598 }
599 loop();
600 }
601
loop()602 void TopDialogManager::loop() {
603 if (!is_active_ || G()->close_flag()) {
604 return;
605 }
606
607 if (!pending_get_top_dialogs_.empty()) {
608 for (auto &query : pending_get_top_dialogs_) {
609 do_get_top_dialogs(std::move(query));
610 }
611 pending_get_top_dialogs_.clear();
612 }
613
614 // server sync
615 Timestamp server_sync_timeout;
616 if (server_sync_state_ == SyncState::Ok) {
617 server_sync_timeout = Timestamp::at(last_server_sync_.at() + SERVER_SYNC_DELAY);
618 if (server_sync_timeout.is_in_past()) {
619 server_sync_state_ = SyncState::None;
620 }
621 }
622
623 Timestamp wakeup_timeout;
624 if (server_sync_state_ == SyncState::Ok) {
625 wakeup_timeout.relax(server_sync_timeout);
626 } else if (server_sync_state_ == SyncState::None && was_first_sync_) {
627 server_sync_state_ = SyncState::Pending;
628 do_get_top_peers();
629 }
630
631 if (is_enabled_) {
632 // database sync
633 Timestamp db_sync_timeout;
634 if (db_sync_state_ == SyncState::Ok) {
635 if (first_unsync_change_) {
636 db_sync_timeout = Timestamp::at(first_unsync_change_.at() + DB_SYNC_DELAY);
637 if (db_sync_timeout.is_in_past()) {
638 db_sync_state_ = SyncState::None;
639 }
640 }
641 }
642
643 if (db_sync_state_ == SyncState::Ok) {
644 wakeup_timeout.relax(db_sync_timeout);
645 } else if (db_sync_state_ == SyncState::None) {
646 if (server_sync_state_ == SyncState::Ok) {
647 do_save_top_dialogs();
648 }
649 }
650 }
651
652 if (wakeup_timeout) {
653 LOG(INFO) << "Wakeup in: " << wakeup_timeout.in();
654 set_timeout_at(wakeup_timeout.at());
655 } else {
656 LOG(INFO) << "Wakeup: never";
657 cancel_timeout();
658 }
659 }
660
661 } // namespace td
662