1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/UpdatesManager.h"
8
9 #include "td/telegram/AnimationsManager.h"
10 #include "td/telegram/AuthManager.h"
11 #include "td/telegram/CallbackQueriesManager.h"
12 #include "td/telegram/CallManager.h"
13 #include "td/telegram/ChannelId.h"
14 #include "td/telegram/ChatId.h"
15 #include "td/telegram/ConfigManager.h"
16 #include "td/telegram/ConfigShared.h"
17 #include "td/telegram/ContactsManager.h"
18 #include "td/telegram/DialogAction.h"
19 #include "td/telegram/DialogId.h"
20 #include "td/telegram/DialogInviteLink.h"
21 #include "td/telegram/DialogParticipant.h"
22 #include "td/telegram/FolderId.h"
23 #include "td/telegram/Global.h"
24 #include "td/telegram/GroupCallManager.h"
25 #include "td/telegram/InlineQueriesManager.h"
26 #include "td/telegram/LanguagePackManager.h"
27 #include "td/telegram/Location.h"
28 #include "td/telegram/MessageId.h"
29 #include "td/telegram/MessagesManager.h"
30 #include "td/telegram/MessageTtlSetting.h"
31 #include "td/telegram/net/DcOptions.h"
32 #include "td/telegram/net/NetQuery.h"
33 #include "td/telegram/NotificationManager.h"
34 #include "td/telegram/Payments.h"
35 #include "td/telegram/PollId.h"
36 #include "td/telegram/PollManager.h"
37 #include "td/telegram/PrivacyManager.h"
38 #include "td/telegram/ScheduledServerMessageId.h"
39 #include "td/telegram/SecretChatId.h"
40 #include "td/telegram/SecretChatsManager.h"
41 #include "td/telegram/ServerMessageId.h"
42 #include "td/telegram/StateManager.h"
43 #include "td/telegram/StickerSetId.h"
44 #include "td/telegram/StickersManager.h"
45 #include "td/telegram/Td.h"
46 #include "td/telegram/td_api.h"
47 #include "td/telegram/TdDb.h"
48 #include "td/telegram/telegram_api.hpp"
49 #include "td/telegram/ThemeManager.h"
50 #include "td/telegram/WebPagesManager.h"
51
52 #include "td/actor/MultiPromise.h"
53
54 #include "td/utils/algorithm.h"
55 #include "td/utils/buffer.h"
56 #include "td/utils/logging.h"
57 #include "td/utils/misc.h"
58 #include "td/utils/Random.h"
59 #include "td/utils/Slice.h"
60 #include "td/utils/SliceBuilder.h"
61 #include "td/utils/Status.h"
62 #include "td/utils/StringBuilder.h"
63 #include "td/utils/Time.h"
64
65 #include <iterator>
66 #include <limits>
67
68 namespace td {
69
70 int VERBOSITY_NAME(get_difference) = VERBOSITY_NAME(INFO);
71
72 class OnUpdate {
73 UpdatesManager *updates_manager_;
74 tl_object_ptr<telegram_api::Update> &update_;
75 mutable Promise<Unit> promise_;
76
77 public:
OnUpdate(UpdatesManager * updates_manager,tl_object_ptr<telegram_api::Update> & update,Promise<Unit> && promise)78 OnUpdate(UpdatesManager *updates_manager, tl_object_ptr<telegram_api::Update> &update, Promise<Unit> &&promise)
79 : updates_manager_(updates_manager), update_(update), promise_(std::move(promise)) {
80 }
81
82 template <class T>
operator ()(T & obj) const83 void operator()(T &obj) const {
84 CHECK(&*update_ == &obj);
85 updates_manager_->on_update(move_tl_object_as<T>(update_), std::move(promise_));
86 }
87 };
88
89 class GetUpdatesStateQuery final : public Td::ResultHandler {
90 Promise<tl_object_ptr<telegram_api::updates_state>> promise_;
91
92 public:
GetUpdatesStateQuery(Promise<tl_object_ptr<telegram_api::updates_state>> && promise)93 explicit GetUpdatesStateQuery(Promise<tl_object_ptr<telegram_api::updates_state>> &&promise)
94 : promise_(std::move(promise)) {
95 }
96
send()97 void send() {
98 send_query(G()->net_query_creator().create(telegram_api::updates_getState()));
99 }
100
on_result(BufferSlice packet)101 void on_result(BufferSlice packet) final {
102 auto result_ptr = fetch_result<telegram_api::updates_getState>(packet);
103 if (result_ptr.is_error()) {
104 return on_error(result_ptr.move_as_error());
105 }
106
107 promise_.set_value(result_ptr.move_as_ok());
108 }
109
on_error(Status status)110 void on_error(Status status) final {
111 promise_.set_error(std::move(status));
112 }
113 };
114
115 class PingServerQuery final : public Td::ResultHandler {
116 Promise<tl_object_ptr<telegram_api::updates_state>> promise_;
117
118 public:
PingServerQuery(Promise<tl_object_ptr<telegram_api::updates_state>> && promise)119 explicit PingServerQuery(Promise<tl_object_ptr<telegram_api::updates_state>> &&promise)
120 : promise_(std::move(promise)) {
121 }
122
send()123 void send() {
124 send_query(G()->net_query_creator().create(telegram_api::updates_getState()));
125 }
126
on_result(BufferSlice packet)127 void on_result(BufferSlice packet) final {
128 auto result_ptr = fetch_result<telegram_api::updates_getState>(packet);
129 if (result_ptr.is_error()) {
130 return on_error(result_ptr.move_as_error());
131 }
132
133 promise_.set_value(result_ptr.move_as_ok());
134 }
135
on_error(Status status)136 void on_error(Status status) final {
137 promise_.set_error(std::move(status));
138 }
139 };
140
141 class GetDifferenceQuery final : public Td::ResultHandler {
142 Promise<tl_object_ptr<telegram_api::updates_Difference>> promise_;
143
144 public:
GetDifferenceQuery(Promise<tl_object_ptr<telegram_api::updates_Difference>> && promise)145 explicit GetDifferenceQuery(Promise<tl_object_ptr<telegram_api::updates_Difference>> &&promise)
146 : promise_(std::move(promise)) {
147 }
148
send(int32 pts,int32 date,int32 qts)149 void send(int32 pts, int32 date, int32 qts) {
150 send_query(G()->net_query_creator().create(telegram_api::updates_getDifference(0, pts, 0, date, qts)));
151 }
152
on_result(BufferSlice packet)153 void on_result(BufferSlice packet) final {
154 VLOG(get_difference) << "Receive getDifference result of size " << packet.size();
155 auto result_ptr = fetch_result<telegram_api::updates_getDifference>(packet);
156 if (result_ptr.is_error()) {
157 return on_error(result_ptr.move_as_error());
158 }
159
160 promise_.set_value(result_ptr.move_as_ok());
161 }
162
on_error(Status status)163 void on_error(Status status) final {
164 promise_.set_error(std::move(status));
165 }
166 };
167
168 const double UpdatesManager::MAX_UNFILLED_GAP_TIME = 0.7;
169 const double UpdatesManager::MAX_PTS_SAVE_DELAY = 0.05;
170
UpdatesManager(Td * td,ActorShared<> parent)171 UpdatesManager::UpdatesManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
172 last_pts_save_time_ = last_qts_save_time_ = Time::now() - 2 * MAX_PTS_SAVE_DELAY;
173 }
174
tear_down()175 void UpdatesManager::tear_down() {
176 parent_.reset();
177 }
178
hangup_shared()179 void UpdatesManager::hangup_shared() {
180 ref_cnt_--;
181 if (ref_cnt_ == 0) {
182 stop();
183 }
184 }
185
hangup()186 void UpdatesManager::hangup() {
187 pending_pts_updates_.clear();
188 postponed_pts_updates_.clear();
189 postponed_updates_.clear();
190 pending_seq_updates_.clear();
191 pending_qts_updates_.clear();
192
193 hangup_shared();
194 }
195
create_reference()196 ActorShared<UpdatesManager> UpdatesManager::create_reference() {
197 ref_cnt_++;
198 return actor_shared(this, 1);
199 }
200
fill_pts_gap(void * td)201 void UpdatesManager::fill_pts_gap(void *td) {
202 CHECK(td != nullptr);
203 if (G()->close_flag()) {
204 return;
205 }
206
207 auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
208 auto min_pts = std::numeric_limits<int32>::max();
209 auto max_pts = 0;
210 if (!updates_manager->pending_pts_updates_.empty()) {
211 min_pts = min(min_pts, updates_manager->pending_pts_updates_.begin()->first);
212 max_pts = max(max_pts, updates_manager->pending_pts_updates_.rbegin()->first);
213 }
214 if (!updates_manager->postponed_pts_updates_.empty()) {
215 min_pts = min(min_pts, updates_manager->postponed_pts_updates_.begin()->first);
216 max_pts = max(max_pts, updates_manager->postponed_pts_updates_.rbegin()->first);
217 }
218 string source = PSTRING() << "pts from " << updates_manager->get_pts() << " to " << min_pts << '-' << max_pts;
219 fill_gap(td, source.c_str());
220 }
221
fill_seq_gap(void * td)222 void UpdatesManager::fill_seq_gap(void *td) {
223 CHECK(td != nullptr);
224 if (G()->close_flag()) {
225 return;
226 }
227
228 auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
229 auto min_seq = std::numeric_limits<int32>::max();
230 auto max_seq = 0;
231 if (!updates_manager->pending_seq_updates_.empty()) {
232 min_seq = updates_manager->pending_seq_updates_.begin()->first;
233 max_seq = updates_manager->pending_seq_updates_.rbegin()->second.seq_end;
234 }
235 string source = PSTRING() << "seq from " << updates_manager->seq_ << " to " << min_seq << '-' << max_seq;
236 fill_gap(td, source.c_str());
237 }
238
fill_qts_gap(void * td)239 void UpdatesManager::fill_qts_gap(void *td) {
240 CHECK(td != nullptr);
241 if (G()->close_flag()) {
242 return;
243 }
244
245 auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
246 auto min_qts = std::numeric_limits<int32>::max();
247 auto max_qts = 0;
248 if (!updates_manager->pending_qts_updates_.empty()) {
249 min_qts = updates_manager->pending_qts_updates_.begin()->first;
250 max_qts = updates_manager->pending_qts_updates_.rbegin()->first;
251 }
252 string source = PSTRING() << "qts from " << updates_manager->get_qts() << " to " << min_qts << '-' << max_qts;
253 fill_gap(td, source.c_str());
254 }
255
fill_get_difference_gap(void * td)256 void UpdatesManager::fill_get_difference_gap(void *td) {
257 fill_gap(td, nullptr);
258 }
259
fill_gap(void * td,const char * source)260 void UpdatesManager::fill_gap(void *td, const char *source) {
261 CHECK(td != nullptr);
262 if (G()->close_flag() || !static_cast<Td *>(td)->auth_manager_->is_authorized()) {
263 return;
264 }
265 auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
266
267 if (source != nullptr && !updates_manager->running_get_difference_) {
268 LOG(WARNING) << "Filling gap in " << source << " by running getDifference";
269 }
270
271 updates_manager->get_difference("fill_gap");
272 }
273
get_difference(const char * source)274 void UpdatesManager::get_difference(const char *source) {
275 if (G()->close_flag() || !td_->auth_manager_->is_authorized()) {
276 return;
277 }
278 if (get_pts() == -1) {
279 init_state();
280 return;
281 }
282
283 if (running_get_difference_) {
284 VLOG(get_difference) << "Skip running getDifference from " << source << " because it is already running";
285 return;
286 }
287
288 run_get_difference(false, source);
289 }
290
run_get_difference(bool is_recursive,const char * source)291 void UpdatesManager::run_get_difference(bool is_recursive, const char *source) {
292 CHECK(get_pts() != -1);
293 CHECK(td_->auth_manager_->is_authorized());
294 CHECK(!running_get_difference_);
295
296 running_get_difference_ = true;
297
298 int32 pts = get_pts();
299 int32 date = get_date();
300 int32 qts = get_qts();
301 if (pts < 0) {
302 pts = 0;
303 }
304
305 VLOG(get_difference) << "-----BEGIN GET DIFFERENCE----- from " << source << " with pts = " << pts << ", qts = " << qts
306 << ", date = " << date;
307
308 before_get_difference(false);
309
310 if (!is_recursive) {
311 min_postponed_update_pts_ = 0;
312 min_postponed_update_qts_ = 0;
313 }
314
315 auto promise = PromiseCreator::lambda([](Result<tl_object_ptr<telegram_api::updates_Difference>> result) {
316 if (result.is_ok()) {
317 send_closure(G()->updates_manager(), &UpdatesManager::on_get_difference, result.move_as_ok());
318 } else {
319 send_closure(G()->updates_manager(), &UpdatesManager::on_failed_get_difference, result.move_as_error());
320 }
321 });
322 td_->create_handler<GetDifferenceQuery>(std::move(promise))->send(pts, date, qts);
323 last_get_difference_pts_ = pts;
324 last_get_difference_qts_ = qts;
325 }
326
before_get_difference(bool is_initial)327 void UpdatesManager::before_get_difference(bool is_initial) {
328 // may be called many times before after_get_difference is called
329 send_closure(G()->state_manager(), &StateManager::on_synchronized, false);
330
331 postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
332 std::make_move_iterator(pending_pts_updates_.end()));
333
334 drop_all_pending_pts_updates();
335
336 send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
337 }
338
add_pts(int32 pts)339 Promise<> UpdatesManager::add_pts(int32 pts) {
340 auto id = pts_manager_.add_pts(pts);
341 return PromiseCreator::event(self_closure(this, &UpdatesManager::on_pts_ack, id));
342 }
343
add_qts(int32 qts)344 Promise<> UpdatesManager::add_qts(int32 qts) {
345 auto id = qts_manager_.add_pts(qts);
346 return PromiseCreator::event(self_closure(this, &UpdatesManager::on_qts_ack, id));
347 }
348
on_pts_ack(PtsManager::PtsId ack_token)349 void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) {
350 auto old_pts = pts_manager_.db_pts();
351 auto new_pts = pts_manager_.finish(ack_token);
352 if (old_pts != new_pts) {
353 save_pts(new_pts);
354 }
355 }
356
on_qts_ack(PtsManager::PtsId ack_token)357 void UpdatesManager::on_qts_ack(PtsManager::PtsId ack_token) {
358 auto old_qts = qts_manager_.db_pts();
359 auto new_qts = qts_manager_.finish(ack_token);
360 if (old_qts != new_qts) {
361 save_qts(new_qts);
362 }
363 }
364
save_pts(int32 pts)365 void UpdatesManager::save_pts(int32 pts) {
366 if (pts == std::numeric_limits<int32>::max()) {
367 G()->td_db()->get_binlog_pmc()->erase("updates.pts");
368 last_pts_save_time_ -= 2 * MAX_PTS_SAVE_DELAY;
369 pending_pts_ = 0;
370 } else if (!G()->ignore_background_updates()) {
371 auto now = Time::now();
372 auto delay = last_pts_save_time_ + MAX_PTS_SAVE_DELAY - now;
373 if (delay <= 0 || !td_->auth_manager_->is_bot()) {
374 last_pts_save_time_ = now;
375 pending_pts_ = 0;
376 G()->td_db()->get_binlog_pmc()->set("updates.pts", to_string(pts));
377 } else {
378 pending_pts_ = pts;
379 if (!has_timeout()) {
380 set_timeout_in(delay);
381 }
382 }
383 }
384 }
385
save_qts(int32 qts)386 void UpdatesManager::save_qts(int32 qts) {
387 if (!G()->ignore_background_updates()) {
388 auto now = Time::now();
389 auto delay = last_qts_save_time_ + MAX_PTS_SAVE_DELAY - now;
390 if (delay <= 0 || !td_->auth_manager_->is_bot()) {
391 last_qts_save_time_ = now;
392 pending_qts_ = 0;
393 G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts));
394 } else {
395 pending_qts_ = qts;
396 if (!has_timeout()) {
397 set_timeout_in(delay);
398 }
399 }
400 }
401 }
402
timeout_expired()403 void UpdatesManager::timeout_expired() {
404 if (pending_pts_ != 0) {
405 last_pts_save_time_ -= 2 * MAX_PTS_SAVE_DELAY;
406 save_pts(pending_pts_);
407 CHECK(pending_pts_ == 0);
408 }
409 if (pending_qts_ != 0) {
410 last_qts_save_time_ -= 2 * MAX_PTS_SAVE_DELAY;
411 save_qts(pending_qts_);
412 CHECK(pending_qts_ == 0);
413 }
414 }
415
set_pts(int32 pts,const char * source)416 Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
417 if (pts == std::numeric_limits<int32>::max()) {
418 LOG(WARNING) << "Update pts from " << get_pts() << " to -1 from " << source;
419 save_pts(pts);
420 auto result = add_pts(pts);
421 init_state();
422 return result;
423 }
424 Promise<> result;
425 if (pts > get_pts() || (0 < pts && pts < get_pts() - 399999)) { // pts can only go up or drop cardinally
426 if (pts < get_pts() - 399999) {
427 LOG(WARNING) << "Pts decreases from " << get_pts() << " to " << pts << " from " << source;
428 } else {
429 LOG(INFO) << "Update pts from " << get_pts() << " to " << pts << " from " << source;
430 }
431
432 result = add_pts(pts);
433 if (last_get_difference_pts_ < get_pts() - FORCED_GET_DIFFERENCE_PTS_DIFF) {
434 last_get_difference_pts_ = get_pts();
435 schedule_get_difference("rare pts getDifference");
436 }
437 } else if (pts < get_pts()) {
438 LOG(ERROR) << "Receive wrong pts = " << pts << " from " << source << ". Current pts = " << get_pts();
439 }
440 return result;
441 }
442
set_date(int32 date,bool from_update,string date_source)443 void UpdatesManager::set_date(int32 date, bool from_update, string date_source) {
444 if (date > date_) {
445 LOG(INFO) << "Update date to " << date;
446 if (from_update && false) { // date in updates is decreased by the server
447 date--;
448
449 if (date == date_) {
450 return;
451 }
452 }
453 auto now = G()->unix_time();
454 if (date_ > now + 1) {
455 LOG(ERROR) << "Receive wrong by " << (date_ - now) << " date = " << date_ << " from " << date_source
456 << ". Now = " << now;
457 date_ = now;
458 if (date_ <= date) {
459 return;
460 }
461 }
462
463 date_ = date;
464 date_source_ = std::move(date_source);
465 if (!G()->ignore_background_updates()) {
466 G()->td_db()->get_binlog_pmc()->set("updates.date", to_string(date));
467 }
468 } else if (date < date_) {
469 if (from_update) {
470 date++;
471
472 if (date == date_) {
473 return;
474 }
475 }
476 LOG(ERROR) << "Receive wrong by " << (date_ - date) << " date = " << date << " from " << date_source
477 << ". Current date = " << date_ << " from " << date_source_;
478 }
479 }
480
is_acceptable_user(UserId user_id) const481 bool UpdatesManager::is_acceptable_user(UserId user_id) const {
482 return td_->contacts_manager_->have_user_force(user_id) && td_->contacts_manager_->have_user(user_id);
483 }
484
is_acceptable_chat(ChatId chat_id) const485 bool UpdatesManager::is_acceptable_chat(ChatId chat_id) const {
486 return td_->contacts_manager_->have_chat_force(chat_id);
487 }
488
is_acceptable_channel(ChannelId channel_id) const489 bool UpdatesManager::is_acceptable_channel(ChannelId channel_id) const {
490 return td_->contacts_manager_->have_channel_force(channel_id);
491 }
492
is_acceptable_peer(const tl_object_ptr<telegram_api::Peer> & peer) const493 bool UpdatesManager::is_acceptable_peer(const tl_object_ptr<telegram_api::Peer> &peer) const {
494 if (peer == nullptr) {
495 return true;
496 }
497
498 DialogId dialog_id(peer);
499 switch (dialog_id.get_type()) {
500 case DialogType::User:
501 if (!is_acceptable_user(dialog_id.get_user_id())) {
502 return false;
503 }
504 break;
505 case DialogType::Chat:
506 if (!is_acceptable_chat(dialog_id.get_chat_id())) {
507 return false;
508 }
509 break;
510 case DialogType::Channel:
511 if (!is_acceptable_channel(dialog_id.get_channel_id())) {
512 return false;
513 }
514 break;
515 case DialogType::None:
516 return false;
517 case DialogType::SecretChat:
518 default:
519 UNREACHABLE();
520 return false;
521 }
522 return true;
523 }
524
is_acceptable_message_entities(const vector<tl_object_ptr<telegram_api::MessageEntity>> & message_entities) const525 bool UpdatesManager::is_acceptable_message_entities(
526 const vector<tl_object_ptr<telegram_api::MessageEntity>> &message_entities) const {
527 for (auto &entity : message_entities) {
528 if (entity->get_id() == telegram_api::messageEntityMentionName::ID) {
529 auto entity_mention_name = static_cast<const telegram_api::messageEntityMentionName *>(entity.get());
530 UserId user_id(entity_mention_name->user_id_);
531 if (!is_acceptable_user(user_id) || !td_->contacts_manager_->have_input_user(user_id)) {
532 return false;
533 }
534 }
535 }
536 return true;
537 }
538
is_acceptable_reply_markup(const tl_object_ptr<telegram_api::ReplyMarkup> & reply_markup) const539 bool UpdatesManager::is_acceptable_reply_markup(const tl_object_ptr<telegram_api::ReplyMarkup> &reply_markup) const {
540 if (reply_markup == nullptr || reply_markup->get_id() != telegram_api::replyInlineMarkup::ID) {
541 return true;
542 }
543 for (const auto &row : static_cast<const telegram_api::replyInlineMarkup *>(reply_markup.get())->rows_) {
544 for (const auto &button : row->buttons_) {
545 if (button->get_id() == telegram_api::keyboardButtonUserProfile::ID) {
546 auto user_profile_button = static_cast<const telegram_api::keyboardButtonUserProfile *>(button.get());
547 UserId user_id(user_profile_button->user_id_);
548 if (!is_acceptable_user(user_id) || !td_->contacts_manager_->have_input_user(user_id)) {
549 return false;
550 }
551 }
552 }
553 }
554 return true;
555 }
556
is_acceptable_message_reply_header(const telegram_api::object_ptr<telegram_api::messageReplyHeader> & header) const557 bool UpdatesManager::is_acceptable_message_reply_header(
558 const telegram_api::object_ptr<telegram_api::messageReplyHeader> &header) const {
559 if (header == nullptr) {
560 return true;
561 }
562
563 if (!is_acceptable_peer(header->reply_to_peer_id_)) {
564 return false;
565 }
566 return true;
567 }
568
is_acceptable_message_forward_header(const telegram_api::object_ptr<telegram_api::messageFwdHeader> & header) const569 bool UpdatesManager::is_acceptable_message_forward_header(
570 const telegram_api::object_ptr<telegram_api::messageFwdHeader> &header) const {
571 if (header == nullptr) {
572 return true;
573 }
574
575 if (!is_acceptable_peer(header->from_id_)) {
576 return false;
577 }
578 if (!is_acceptable_peer(header->saved_from_peer_)) {
579 return false;
580 }
581 return true;
582 }
583
is_acceptable_message(const telegram_api::Message * message_ptr) const584 bool UpdatesManager::is_acceptable_message(const telegram_api::Message *message_ptr) const {
585 CHECK(message_ptr != nullptr);
586 int32 constructor_id = message_ptr->get_id();
587
588 switch (constructor_id) {
589 case telegram_api::messageEmpty::ID:
590 return true;
591 case telegram_api::message::ID: {
592 auto message = static_cast<const telegram_api::message *>(message_ptr);
593
594 if (!is_acceptable_peer(message->peer_id_)) {
595 return false;
596 }
597 if (!is_acceptable_peer(message->from_id_)) {
598 return false;
599 }
600
601 if (!is_acceptable_message_reply_header(message->reply_to_)) {
602 return false;
603 }
604 if (!is_acceptable_message_forward_header(message->fwd_from_)) {
605 return false;
606 }
607
608 if ((message->flags_ & MessagesManager::MESSAGE_FLAG_IS_SENT_VIA_BOT) &&
609 !is_acceptable_user(UserId(message->via_bot_id_))) {
610 return false;
611 }
612
613 if (!is_acceptable_message_entities(message->entities_)) {
614 return false;
615 }
616
617 if (message->flags_ & MessagesManager::MESSAGE_FLAG_HAS_MEDIA) {
618 CHECK(message->media_ != nullptr);
619 auto media_id = message->media_->get_id();
620 if (media_id == telegram_api::messageMediaContact::ID) {
621 auto message_media_contact = static_cast<const telegram_api::messageMediaContact *>(message->media_.get());
622 UserId user_id(message_media_contact->user_id_);
623 if (user_id != UserId() && !is_acceptable_user(user_id)) {
624 return false;
625 }
626 }
627 /*
628 // the users are always min, so no need to check
629 if (media_id == telegram_api::messageMediaPoll::ID) {
630 auto message_media_poll = static_cast<const telegram_api::messageMediaPoll *>(message->media_.get());
631 for (auto recent_voter_user_id : message_media_poll->results_->recent_voters_) {
632 UserId user_id(recent_voter_user_id);
633 if (!is_acceptable_user(user_id)) {
634 return false;
635 }
636 }
637 }
638 */
639 /*
640 // the channel is always min, so no need to check
641 if (media_id == telegram_api::messageMediaWebPage::ID) {
642 auto message_media_web_page = static_cast<const telegram_api::messageMediaWebPage *>(message->media_.get());
643 if (message_media_web_page->webpage_->get_id() == telegram_api::webPage::ID) {
644 auto web_page = static_cast<const telegram_api::webPage *>(message_media_web_page->webpage_.get());
645 if (web_page->cached_page_ != nullptr) {
646 const vector<tl_object_ptr<telegram_api::PageBlock>> *page_blocks = nullptr;
647 downcast_call(*web_page->cached_page_, [&page_blocks](auto &page) { page_blocks = &page.blocks_; });
648 CHECK(page_blocks != nullptr);
649 for (auto &page_block : *page_blocks) {
650 if (page_block->get_id() == telegram_api::pageBlockChannel::ID) {
651 auto page_block_channel = static_cast<const telegram_api::pageBlockChannel *>(page_block.get());
652 auto channel_id = ContactsManager::get_channel_id(page_block_channel->channel_);
653 if (channel_id.is_valid()) {
654 if (!is_acceptable_channel(channel_id)) {
655 return false;
656 }
657 } else {
658 LOG(ERROR) << "Receive wrong channel " << to_string(page_block_channel->channel_);
659 }
660 }
661 }
662 }
663 }
664 }
665 */
666 } else {
667 CHECK(message->media_ == nullptr);
668 }
669
670 /*
671 // the dialogs are always min, so no need to check
672 if (message->replies_ != nullptr) {
673 for (auto &peer : message->replies_->recent_repliers_) {
674 if (!is_acceptable_peer(peer)) {
675 return false;
676 }
677 }
678 }
679 */
680
681 break;
682 }
683 case telegram_api::messageService::ID: {
684 auto message = static_cast<const telegram_api::messageService *>(message_ptr);
685
686 if (!is_acceptable_peer(message->peer_id_)) {
687 return false;
688 }
689 if (!is_acceptable_peer(message->from_id_)) {
690 return false;
691 }
692
693 const telegram_api::MessageAction *action = message->action_.get();
694 CHECK(action != nullptr);
695
696 switch (action->get_id()) {
697 case telegram_api::messageActionEmpty::ID:
698 case telegram_api::messageActionChatEditTitle::ID:
699 case telegram_api::messageActionChatEditPhoto::ID:
700 case telegram_api::messageActionChatDeletePhoto::ID:
701 case telegram_api::messageActionCustomAction::ID:
702 case telegram_api::messageActionBotAllowed::ID:
703 case telegram_api::messageActionHistoryClear::ID:
704 case telegram_api::messageActionChannelCreate::ID:
705 case telegram_api::messageActionPinMessage::ID:
706 case telegram_api::messageActionGameScore::ID:
707 case telegram_api::messageActionPhoneCall::ID:
708 case telegram_api::messageActionPaymentSent::ID:
709 case telegram_api::messageActionPaymentSentMe::ID:
710 case telegram_api::messageActionScreenshotTaken::ID:
711 case telegram_api::messageActionSecureValuesSent::ID:
712 case telegram_api::messageActionSecureValuesSentMe::ID:
713 case telegram_api::messageActionContactSignUp::ID:
714 case telegram_api::messageActionGroupCall::ID:
715 case telegram_api::messageActionGroupCallScheduled::ID:
716 case telegram_api::messageActionSetMessagesTTL::ID:
717 case telegram_api::messageActionSetChatTheme::ID:
718 case telegram_api::messageActionChatJoinedByRequest::ID:
719 break;
720 case telegram_api::messageActionChatCreate::ID: {
721 auto chat_create = static_cast<const telegram_api::messageActionChatCreate *>(action);
722 for (auto &user : chat_create->users_) {
723 if (!is_acceptable_user(UserId(user))) {
724 return false;
725 }
726 }
727 break;
728 }
729 case telegram_api::messageActionChatAddUser::ID: {
730 auto chat_add_user = static_cast<const telegram_api::messageActionChatAddUser *>(action);
731 for (auto &user : chat_add_user->users_) {
732 if (!is_acceptable_user(UserId(user))) {
733 return false;
734 }
735 }
736 break;
737 }
738 case telegram_api::messageActionChatJoinedByLink::ID:
739 // inviter_id_ isn't used
740 break;
741 case telegram_api::messageActionChatDeleteUser::ID: {
742 auto chat_delete_user = static_cast<const telegram_api::messageActionChatDeleteUser *>(action);
743 if (!is_acceptable_user(UserId(chat_delete_user->user_id_))) {
744 return false;
745 }
746 break;
747 }
748 case telegram_api::messageActionChatMigrateTo::ID: {
749 auto chat_migrate_to = static_cast<const telegram_api::messageActionChatMigrateTo *>(action);
750 if (!is_acceptable_channel(ChannelId(chat_migrate_to->channel_id_))) {
751 return false;
752 }
753 break;
754 }
755 case telegram_api::messageActionChannelMigrateFrom::ID: {
756 auto channel_migrate_from = static_cast<const telegram_api::messageActionChannelMigrateFrom *>(action);
757 if (!is_acceptable_chat(ChatId(channel_migrate_from->chat_id_))) {
758 return false;
759 }
760 break;
761 }
762 case telegram_api::messageActionGeoProximityReached::ID: {
763 auto geo_proximity_reached = static_cast<const telegram_api::messageActionGeoProximityReached *>(action);
764 if (!is_acceptable_peer(geo_proximity_reached->from_id_)) {
765 return false;
766 }
767 if (!is_acceptable_peer(geo_proximity_reached->to_id_)) {
768 return false;
769 }
770 break;
771 }
772 case telegram_api::messageActionInviteToGroupCall::ID: {
773 auto invite_to_group_call = static_cast<const telegram_api::messageActionInviteToGroupCall *>(action);
774 for (auto &user : invite_to_group_call->users_) {
775 if (!is_acceptable_user(UserId(user))) {
776 return false;
777 }
778 }
779 break;
780 }
781 default:
782 UNREACHABLE();
783 return false;
784 }
785 break;
786 }
787 default:
788 UNREACHABLE();
789 return false;
790 }
791
792 return true;
793 }
794
is_acceptable_update(const telegram_api::Update * update) const795 bool UpdatesManager::is_acceptable_update(const telegram_api::Update *update) const {
796 if (update == nullptr) {
797 return true;
798 }
799 int32 id = update->get_id();
800 const telegram_api::Message *message = nullptr;
801 if (id == telegram_api::updateNewMessage::ID) {
802 message = static_cast<const telegram_api::updateNewMessage *>(update)->message_.get();
803 }
804 if (id == telegram_api::updateNewChannelMessage::ID) {
805 message = static_cast<const telegram_api::updateNewChannelMessage *>(update)->message_.get();
806 }
807 if (id == telegram_api::updateNewScheduledMessage::ID) {
808 message = static_cast<const telegram_api::updateNewScheduledMessage *>(update)->message_.get();
809 }
810 if (id == telegram_api::updateEditMessage::ID) {
811 message = static_cast<const telegram_api::updateEditMessage *>(update)->message_.get();
812 }
813 if (id == telegram_api::updateEditChannelMessage::ID) {
814 message = static_cast<const telegram_api::updateEditChannelMessage *>(update)->message_.get();
815 }
816 if (message != nullptr) {
817 return is_acceptable_message(message);
818 }
819
820 if (id == telegram_api::updateDraftMessage::ID) {
821 auto update_draft_message = static_cast<const telegram_api::updateDraftMessage *>(update);
822 CHECK(update_draft_message->draft_ != nullptr);
823 if (update_draft_message->draft_->get_id() == telegram_api::draftMessage::ID) {
824 auto draft_message = static_cast<const telegram_api::draftMessage *>(update_draft_message->draft_.get());
825 return is_acceptable_message_entities(draft_message->entities_);
826 }
827 }
828
829 return true;
830 }
831
on_get_updates(tl_object_ptr<telegram_api::Updates> && updates_ptr,Promise<Unit> && promise)832 void UpdatesManager::on_get_updates(tl_object_ptr<telegram_api::Updates> &&updates_ptr, Promise<Unit> &&promise) {
833 promise = PromiseCreator::lambda([promise = std::move(promise)](Result<Unit> result) mutable {
834 if (!G()->close_flag() && result.is_error()) {
835 LOG(ERROR) << "Failed to process updates: " << result.error();
836 }
837 promise.set_value(Unit());
838 });
839
840 CHECK(updates_ptr != nullptr);
841 auto updates_type = updates_ptr->get_id();
842 if (updates_type != telegram_api::updateShort::ID) {
843 LOG(INFO) << "Receive " << to_string(updates_ptr);
844 }
845 if (!td_->auth_manager_->is_authorized()) {
846 if (updates_type == telegram_api::updateShort::ID && !G()->close_flag()) {
847 auto &update = static_cast<telegram_api::updateShort *>(updates_ptr.get())->update_;
848 auto update_id = update->get_id();
849 if (update_id == telegram_api::updateLoginToken::ID) {
850 td_->auth_manager_->on_update_login_token();
851 return promise.set_value(Unit());
852 }
853
854 switch (update_id) {
855 case telegram_api::updateServiceNotification::ID:
856 case telegram_api::updateDcOptions::ID:
857 case telegram_api::updateConfig::ID:
858 case telegram_api::updateLangPackTooLong::ID:
859 case telegram_api::updateLangPack::ID:
860 LOG(INFO) << "Apply without authorization " << to_string(updates_ptr);
861 downcast_call(*update, OnUpdate(this, update, std::move(promise)));
862 return;
863 default:
864 break;
865 }
866 }
867 LOG(INFO) << "Ignore received before authorization or after logout " << to_string(updates_ptr);
868 return promise.set_value(Unit());
869 }
870
871 switch (updates_type) {
872 case telegram_api::updatesTooLong::ID:
873 get_difference("updatesTooLong");
874 promise.set_value(Unit());
875 break;
876 case telegram_api::updateShortMessage::ID: {
877 auto update = move_tl_object_as<telegram_api::updateShortMessage>(updates_ptr);
878 if (update->flags_ & MessagesManager::MESSAGE_FLAG_HAS_REPLY_MARKUP) {
879 LOG(ERROR) << "Receive updateShortMessage with reply_markup";
880 update->flags_ ^= MessagesManager::MESSAGE_FLAG_HAS_REPLY_MARKUP;
881 }
882 if (update->flags_ & MessagesManager::MESSAGE_FLAG_HAS_MEDIA) {
883 LOG(ERROR) << "Receive updateShortMessage with media";
884 update->flags_ ^= MessagesManager::MESSAGE_FLAG_HAS_MEDIA;
885 }
886
887 auto from_id = update->flags_ & MessagesManager::MESSAGE_FLAG_IS_OUT ? td_->contacts_manager_->get_my_id().get()
888 : update->user_id_;
889 update->flags_ |= MessagesManager::MESSAGE_FLAG_HAS_FROM_ID;
890
891 auto message = make_tl_object<telegram_api::message>(
892 update->flags_, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
893 false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, update->id_,
894 make_tl_object<telegram_api::peerUser>(from_id), make_tl_object<telegram_api::peerUser>(update->user_id_),
895 std::move(update->fwd_from_), update->via_bot_id_, std::move(update->reply_to_), update->date_,
896 update->message_, nullptr, nullptr, std::move(update->entities_), 0, 0, nullptr, 0, string(), 0, Auto(),
897 update->ttl_period_);
898 on_pending_update(
899 make_tl_object<telegram_api::updateNewMessage>(std::move(message), update->pts_, update->pts_count_), 0,
900 std::move(promise), "telegram_api::updatesShortMessage");
901 break;
902 }
903 case telegram_api::updateShortChatMessage::ID: {
904 auto update = move_tl_object_as<telegram_api::updateShortChatMessage>(updates_ptr);
905 if (update->flags_ & MessagesManager::MESSAGE_FLAG_HAS_REPLY_MARKUP) {
906 LOG(ERROR) << "Receive updateShortChatMessage with reply_markup";
907 update->flags_ ^= MessagesManager::MESSAGE_FLAG_HAS_REPLY_MARKUP;
908 }
909 if (update->flags_ & MessagesManager::MESSAGE_FLAG_HAS_MEDIA) {
910 LOG(ERROR) << "Receive updateShortChatMessage with media";
911 update->flags_ ^= MessagesManager::MESSAGE_FLAG_HAS_MEDIA;
912 }
913
914 update->flags_ |= MessagesManager::MESSAGE_FLAG_HAS_FROM_ID;
915 auto message = make_tl_object<telegram_api::message>(
916 update->flags_, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
917 false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, update->id_,
918 make_tl_object<telegram_api::peerUser>(update->from_id_),
919 make_tl_object<telegram_api::peerChat>(update->chat_id_), std::move(update->fwd_from_), update->via_bot_id_,
920 std::move(update->reply_to_), update->date_, update->message_, nullptr, nullptr, std::move(update->entities_),
921 0, 0, nullptr, 0, string(), 0, Auto(), update->ttl_period_);
922 on_pending_update(
923 make_tl_object<telegram_api::updateNewMessage>(std::move(message), update->pts_, update->pts_count_), 0,
924 std::move(promise), "telegram_api::updatesShortChatMessage");
925 break;
926 }
927 case telegram_api::updateShort::ID: {
928 auto update = move_tl_object_as<telegram_api::updateShort>(updates_ptr);
929 LOG(DEBUG) << "Receive " << oneline(to_string(update));
930 if (!is_acceptable_update(update->update_.get())) {
931 LOG(ERROR) << "Receive unacceptable short update: " << oneline(to_string(update));
932 promise.set_value(Unit());
933 return get_difference("unacceptable short update");
934 }
935 short_update_date_ = update->date_;
936 downcast_call(*update->update_, OnUpdate(this, update->update_, std::move(promise)));
937 short_update_date_ = 0;
938 break;
939 }
940 case telegram_api::updatesCombined::ID: {
941 auto updates = move_tl_object_as<telegram_api::updatesCombined>(updates_ptr);
942 td_->contacts_manager_->on_get_users(std::move(updates->users_), "updatesCombined");
943 td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updatesCombined");
944 on_pending_updates(std::move(updates->updates_), updates->seq_start_, updates->seq_, updates->date_, Time::now(),
945 std::move(promise), "telegram_api::updatesCombined");
946 break;
947 }
948 case telegram_api::updates::ID: {
949 auto updates = move_tl_object_as<telegram_api::updates>(updates_ptr);
950 td_->contacts_manager_->on_get_users(std::move(updates->users_), "updates");
951 td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updates");
952 on_pending_updates(std::move(updates->updates_), updates->seq_, updates->seq_, updates->date_, Time::now(),
953 std::move(promise), "telegram_api::updates");
954 break;
955 }
956 case telegram_api::updateShortSentMessage::ID:
957 LOG(ERROR) << "Receive " << oneline(to_string(updates_ptr));
958 get_difference("updateShortSentMessage");
959 promise.set_value(Unit());
960 break;
961 default:
962 UNREACHABLE();
963 }
964 }
965
on_failed_get_updates_state(Status && error)966 void UpdatesManager::on_failed_get_updates_state(Status &&error) {
967 if (G()->close_flag() || !td_->auth_manager_->is_authorized()) {
968 return;
969 }
970 if (error.code() != 401) {
971 LOG(ERROR) << "Receive updates.getState error: " << error;
972 }
973
974 running_get_difference_ = false;
975 schedule_get_difference("on_failed_get_updates_state");
976 }
977
on_failed_get_difference(Status && error)978 void UpdatesManager::on_failed_get_difference(Status &&error) {
979 if (G()->close_flag() || !td_->auth_manager_->is_authorized()) {
980 return;
981 }
982 if (error.code() != 401) {
983 LOG(ERROR) << "Receive updates.getDifference error: " << error;
984 }
985
986 running_get_difference_ = false;
987 schedule_get_difference("on_failed_get_difference");
988
989 if (error.message() == Slice("PERSISTENT_TIMESTAMP_INVALID")) {
990 set_pts(std::numeric_limits<int32>::max(), "PERSISTENT_TIMESTAMP_INVALID").set_value(Unit());
991 }
992 }
993
schedule_get_difference(const char * source)994 void UpdatesManager::schedule_get_difference(const char *source) {
995 if (G()->close_flag() || !td_->auth_manager_->is_authorized()) {
996 return;
997 }
998 if (!retry_timeout_.has_timeout()) {
999 LOG(WARNING) << "Schedule getDifference in " << retry_time_ << " seconds with pts = " << get_pts()
1000 << ", qts = " << get_qts() << ", date = " << get_date() << " from " << source;
1001 retry_timeout_.set_callback(std::move(fill_get_difference_gap));
1002 retry_timeout_.set_callback_data(static_cast<void *>(td_));
1003 retry_timeout_.set_timeout_in(retry_time_);
1004 retry_time_ *= 2;
1005 if (retry_time_ > 60) {
1006 retry_time_ = Random::fast(60, 80);
1007 }
1008 } else {
1009 VLOG(get_difference) << "Schedule getDifference from " << source;
1010 }
1011 }
1012
on_get_updates_state(tl_object_ptr<telegram_api::updates_state> && state,const char * source)1013 void UpdatesManager::on_get_updates_state(tl_object_ptr<telegram_api::updates_state> &&state, const char *source) {
1014 CHECK(state != nullptr);
1015
1016 VLOG(get_difference) << "Receive " << oneline(to_string(state)) << " from " << source;
1017 // TODO use state->unread_count;
1018
1019 if (get_pts() == std::numeric_limits<int32>::max()) {
1020 LOG(WARNING) << "Restore pts to " << state->pts_;
1021 // restoring right pts
1022 pts_manager_.init(state->pts_);
1023 last_get_difference_pts_ = get_pts();
1024 last_pts_save_time_ = Time::now() - 2 * MAX_PTS_SAVE_DELAY;
1025 save_pts(state->pts_);
1026 } else {
1027 string full_source = "on_get_updates_state " + oneline(to_string(state)) + " from " + source;
1028 set_pts(state->pts_, full_source.c_str()).set_value(Unit());
1029 set_date(state->date_, false, std::move(full_source));
1030 add_qts(state->qts_).set_value(Unit());
1031
1032 seq_ = state->seq_;
1033 }
1034
1035 if (running_get_difference_) { // called from getUpdatesState
1036 running_get_difference_ = false;
1037 after_get_difference();
1038 }
1039 }
1040
get_updates(const telegram_api::Updates * updates_ptr)1041 const vector<tl_object_ptr<telegram_api::Update>> *UpdatesManager::get_updates(
1042 const telegram_api::Updates *updates_ptr) {
1043 switch (updates_ptr->get_id()) {
1044 case telegram_api::updatesTooLong::ID:
1045 case telegram_api::updateShortMessage::ID:
1046 case telegram_api::updateShortChatMessage::ID:
1047 case telegram_api::updateShort::ID:
1048 case telegram_api::updateShortSentMessage::ID:
1049 LOG(ERROR) << "Receive " << oneline(to_string(*updates_ptr)) << " instead of updates";
1050 return nullptr;
1051 case telegram_api::updatesCombined::ID:
1052 return &static_cast<const telegram_api::updatesCombined *>(updates_ptr)->updates_;
1053 case telegram_api::updates::ID:
1054 return &static_cast<const telegram_api::updates *>(updates_ptr)->updates_;
1055 default:
1056 UNREACHABLE();
1057 return nullptr;
1058 }
1059 }
1060
get_updates(telegram_api::Updates * updates_ptr)1061 vector<tl_object_ptr<telegram_api::Update>> *UpdatesManager::get_updates(telegram_api::Updates *updates_ptr) {
1062 return const_cast<vector<tl_object_ptr<telegram_api::Update>> *>(
1063 get_updates(static_cast<const telegram_api::Updates *>(updates_ptr)));
1064 }
1065
get_sent_messages_random_ids(const telegram_api::Updates * updates_ptr)1066 std::unordered_set<int64> UpdatesManager::get_sent_messages_random_ids(const telegram_api::Updates *updates_ptr) {
1067 std::unordered_set<int64> random_ids;
1068 auto updates = get_updates(updates_ptr);
1069 if (updates != nullptr) {
1070 for (auto &update : *updates) {
1071 if (update->get_id() == telegram_api::updateMessageID::ID) {
1072 int64 random_id = static_cast<const telegram_api::updateMessageID *>(update.get())->random_id_;
1073 if (!random_ids.insert(random_id).second) {
1074 LOG(ERROR) << "Receive twice updateMessageID for " << random_id;
1075 }
1076 }
1077 }
1078 }
1079 return random_ids;
1080 }
1081
get_new_messages(const telegram_api::Updates * updates_ptr)1082 vector<const tl_object_ptr<telegram_api::Message> *> UpdatesManager::get_new_messages(
1083 const telegram_api::Updates *updates_ptr) {
1084 vector<const tl_object_ptr<telegram_api::Message> *> messages;
1085 auto updates = get_updates(updates_ptr);
1086 if (updates != nullptr) {
1087 for (auto &update : *updates) {
1088 auto constructor_id = update->get_id();
1089 if (constructor_id == telegram_api::updateNewMessage::ID) {
1090 messages.emplace_back(&static_cast<const telegram_api::updateNewMessage *>(update.get())->message_);
1091 } else if (constructor_id == telegram_api::updateNewChannelMessage::ID) {
1092 messages.emplace_back(&static_cast<const telegram_api::updateNewChannelMessage *>(update.get())->message_);
1093 } else if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
1094 messages.emplace_back(&static_cast<const telegram_api::updateNewScheduledMessage *>(update.get())->message_);
1095 }
1096 }
1097 }
1098 return messages;
1099 }
1100
get_update_new_group_call_ids(const telegram_api::Updates * updates_ptr)1101 vector<InputGroupCallId> UpdatesManager::get_update_new_group_call_ids(const telegram_api::Updates *updates_ptr) {
1102 vector<InputGroupCallId> input_group_call_ids;
1103 auto updates = get_updates(updates_ptr);
1104 if (updates != nullptr) {
1105 for (auto &update : *updates) {
1106 InputGroupCallId input_group_call_id;
1107 if (update->get_id() == telegram_api::updateGroupCall::ID) {
1108 auto group_call_ptr = static_cast<const telegram_api::updateGroupCall *>(update.get())->call_.get();
1109 if (group_call_ptr->get_id() == telegram_api::groupCall::ID) {
1110 auto group_call = static_cast<const telegram_api::groupCall *>(group_call_ptr);
1111 input_group_call_id = InputGroupCallId(group_call->id_, group_call->access_hash_);
1112 }
1113 }
1114
1115 if (input_group_call_id.is_valid()) {
1116 input_group_call_ids.push_back(input_group_call_id);
1117 }
1118 }
1119 }
1120 return input_group_call_ids;
1121 }
1122
extract_join_group_call_presentation_params(telegram_api::Updates * updates_ptr)1123 string UpdatesManager::extract_join_group_call_presentation_params(telegram_api::Updates *updates_ptr) {
1124 auto updates = get_updates(updates_ptr);
1125 for (auto it = updates->begin(); it != updates->end(); ++it) {
1126 auto *update = it->get();
1127 if (update->get_id() == telegram_api::updateGroupCallConnection::ID &&
1128 static_cast<const telegram_api::updateGroupCallConnection *>(update)->presentation_) {
1129 string result = std::move(static_cast<telegram_api::updateGroupCallConnection *>(update)->params_->data_);
1130 updates->erase(it);
1131 return result;
1132 }
1133 }
1134 return string();
1135 }
1136
get_update_notify_settings_dialog_ids(const telegram_api::Updates * updates_ptr)1137 vector<DialogId> UpdatesManager::get_update_notify_settings_dialog_ids(const telegram_api::Updates *updates_ptr) {
1138 vector<DialogId> dialog_ids;
1139 auto updates = get_updates(updates_ptr);
1140 if (updates != nullptr) {
1141 dialog_ids.reserve(updates->size());
1142 for (auto &update : *updates) {
1143 DialogId dialog_id;
1144 if (update->get_id() == telegram_api::updateNotifySettings::ID) {
1145 auto notify_peer = static_cast<const telegram_api::updateNotifySettings *>(update.get())->peer_.get();
1146 if (notify_peer->get_id() == telegram_api::notifyPeer::ID) {
1147 dialog_id = DialogId(static_cast<const telegram_api::notifyPeer *>(notify_peer)->peer_);
1148 }
1149 }
1150
1151 if (dialog_id.is_valid()) {
1152 dialog_ids.push_back(dialog_id);
1153 } else {
1154 LOG(ERROR) << "Receive unexpected " << to_string(update);
1155 }
1156 }
1157 }
1158 return dialog_ids;
1159 }
1160
get_chat_dialog_ids(const telegram_api::Updates * updates_ptr)1161 vector<DialogId> UpdatesManager::get_chat_dialog_ids(const telegram_api::Updates *updates_ptr) {
1162 const vector<tl_object_ptr<telegram_api::Chat>> *chats = nullptr;
1163 switch (updates_ptr->get_id()) {
1164 case telegram_api::updatesTooLong::ID:
1165 case telegram_api::updateShortMessage::ID:
1166 case telegram_api::updateShortChatMessage::ID:
1167 case telegram_api::updateShort::ID:
1168 case telegram_api::updateShortSentMessage::ID:
1169 LOG(ERROR) << "Receive " << oneline(to_string(*updates_ptr)) << " instead of updates";
1170 break;
1171 case telegram_api::updatesCombined::ID: {
1172 chats = &static_cast<const telegram_api::updatesCombined *>(updates_ptr)->chats_;
1173 break;
1174 }
1175 case telegram_api::updates::ID: {
1176 chats = &static_cast<const telegram_api::updates *>(updates_ptr)->chats_;
1177 break;
1178 }
1179 default:
1180 UNREACHABLE();
1181 }
1182
1183 if (chats == nullptr) {
1184 return {};
1185 }
1186
1187 vector<DialogId> dialog_ids;
1188 dialog_ids.reserve(chats->size());
1189 for (const auto &chat : *chats) {
1190 auto chat_id = ContactsManager::get_chat_id(chat);
1191 if (chat_id.is_valid()) {
1192 dialog_ids.push_back(DialogId(chat_id));
1193 continue;
1194 }
1195
1196 auto channel_id = ContactsManager::get_channel_id(chat);
1197 if (channel_id.is_valid()) {
1198 dialog_ids.push_back(DialogId(channel_id));
1199 continue;
1200 }
1201
1202 LOG(ERROR) << "Can't find identifier of " << oneline(to_string(chat));
1203 }
1204 return dialog_ids;
1205 }
1206
get_update_edit_message_pts(const telegram_api::Updates * updates_ptr)1207 int32 UpdatesManager::get_update_edit_message_pts(const telegram_api::Updates *updates_ptr) {
1208 int32 pts = 0;
1209 auto updates = get_updates(updates_ptr);
1210 if (updates != nullptr) {
1211 for (auto &update : *updates) {
1212 int32 update_pts = [&] {
1213 switch (update->get_id()) {
1214 case telegram_api::updateEditMessage::ID:
1215 return static_cast<const telegram_api::updateEditMessage *>(update.get())->pts_;
1216 case telegram_api::updateEditChannelMessage::ID:
1217 return static_cast<const telegram_api::updateEditChannelMessage *>(update.get())->pts_;
1218 default:
1219 return 0;
1220 }
1221 }();
1222 if (update_pts != 0) {
1223 if (pts == 0) {
1224 pts = update_pts;
1225 } else {
1226 pts = -1;
1227 }
1228 }
1229 }
1230 }
1231 if (pts == -1) {
1232 LOG(ERROR) << "Receive multiple edit message updates in " << to_string(*updates_ptr);
1233 pts = 0;
1234 }
1235 return pts;
1236 }
1237
init_state()1238 void UpdatesManager::init_state() {
1239 if (G()->close_flag() || !td_->auth_manager_->is_authorized()) {
1240 return;
1241 }
1242
1243 auto pmc = G()->td_db()->get_binlog_pmc();
1244 if (G()->ignore_background_updates()) {
1245 // just in case
1246 pmc->erase("updates.pts");
1247 pmc->erase("updates.qts");
1248 pmc->erase("updates.date");
1249 }
1250 string pts_str = pmc->get("updates.pts");
1251 if (pts_str.empty()) {
1252 if (!running_get_difference_) {
1253 running_get_difference_ = true;
1254
1255 before_get_difference(true);
1256
1257 auto promise = PromiseCreator::lambda([](Result<tl_object_ptr<telegram_api::updates_state>> result) {
1258 if (result.is_ok()) {
1259 send_closure(G()->updates_manager(), &UpdatesManager::on_get_updates_state, result.move_as_ok(),
1260 "GetUpdatesStateQuery");
1261 } else {
1262 send_closure(G()->updates_manager(), &UpdatesManager::on_failed_get_updates_state, result.move_as_error());
1263 }
1264 });
1265 td_->create_handler<GetUpdatesStateQuery>(std::move(promise))->send();
1266 }
1267 return;
1268 }
1269 pts_manager_.init(to_integer<int32>(pts_str));
1270 last_get_difference_pts_ = get_pts();
1271 qts_manager_.init(to_integer<int32>(pmc->get("updates.qts")));
1272 last_get_difference_qts_ = get_qts();
1273 date_ = to_integer<int32>(pmc->get("updates.date"));
1274 date_source_ = "database";
1275 LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_;
1276
1277 get_difference("init_state");
1278 }
1279
ping_server()1280 void UpdatesManager::ping_server() {
1281 auto promise = PromiseCreator::lambda([](Result<tl_object_ptr<telegram_api::updates_state>> result) {
1282 auto state = result.is_ok() ? result.move_as_ok() : nullptr;
1283 send_closure(G()->updates_manager(), &UpdatesManager::on_server_pong, std::move(state));
1284 });
1285 td_->create_handler<PingServerQuery>(std::move(promise))->send();
1286 }
1287
on_server_pong(tl_object_ptr<telegram_api::updates_state> && state)1288 void UpdatesManager::on_server_pong(tl_object_ptr<telegram_api::updates_state> &&state) {
1289 LOG(INFO) << "Receive " << oneline(to_string(state));
1290 if (state == nullptr || state->pts_ > get_pts() || state->seq_ > seq_) {
1291 get_difference("on server pong");
1292 }
1293 }
1294
process_get_difference_updates(vector<tl_object_ptr<telegram_api::Message>> && new_messages,vector<tl_object_ptr<telegram_api::EncryptedMessage>> && new_encrypted_messages,vector<tl_object_ptr<telegram_api::Update>> && other_updates)1295 void UpdatesManager::process_get_difference_updates(
1296 vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
1297 vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
1298 vector<tl_object_ptr<telegram_api::Update>> &&other_updates) {
1299 VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, "
1300 << new_encrypted_messages.size() << " encrypted messages and " << other_updates.size()
1301 << " other updates";
1302 for (auto &update : other_updates) {
1303 auto constructor_id = update->get_id();
1304 if (constructor_id == telegram_api::updateMessageID::ID) {
1305 // in getDifference updateMessageID can't be received for scheduled messages
1306 LOG(INFO) << "Receive update about sent message " << to_string(update);
1307 auto update_message_id = move_tl_object_as<telegram_api::updateMessageID>(update);
1308 td_->messages_manager_->on_update_message_id(update_message_id->random_id_,
1309 MessageId(ServerMessageId(update_message_id->id_)), "getDifference");
1310 CHECK(!running_get_difference_);
1311 }
1312
1313 if (constructor_id == telegram_api::updateEncryption::ID) {
1314 on_update(move_tl_object_as<telegram_api::updateEncryption>(update), Promise<Unit>());
1315 CHECK(!running_get_difference_);
1316 }
1317
1318 if (constructor_id == telegram_api::updateFolderPeers::ID) {
1319 auto update_folder_peers = move_tl_object_as<telegram_api::updateFolderPeers>(update);
1320 if (update_folder_peers->pts_count_ != 0) {
1321 LOG(ERROR) << "Receive updateFolderPeers with pts_count = " << update_folder_peers->pts_count_;
1322 update_folder_peers->pts_count_ = 0;
1323 }
1324 update_folder_peers->pts_ = 0;
1325 on_update(std::move(update_folder_peers), Promise<Unit>());
1326 CHECK(!running_get_difference_);
1327 }
1328
1329 if (constructor_id == telegram_api::updateChat::ID) {
1330 update = nullptr;
1331 }
1332
1333 if (constructor_id == telegram_api::updateChannel::ID) {
1334 update = nullptr;
1335 }
1336
1337 /*
1338 // TODO can't apply it here, because dialog may not be created yet
1339 // process updateReadHistoryInbox before new messages
1340 if (constructor_id == telegram_api::updateReadHistoryInbox::ID) {
1341 static_cast<telegram_api::updateReadHistoryInbox *>(update.get())->still_unread_count_ = -1;
1342 process_pts_update(std::move(update));
1343 CHECK(!running_get_difference_);
1344 }
1345 */
1346 }
1347
1348 for (auto &message : new_messages) {
1349 // channel messages must not be received in this vector
1350 td_->messages_manager_->on_get_message(std::move(message), true, false, false, true, true, "get difference");
1351 CHECK(!running_get_difference_);
1352 }
1353
1354 for (auto &encrypted_message : new_encrypted_messages) {
1355 send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(encrypted_message),
1356 Promise<Unit>());
1357 }
1358
1359 process_updates(std::move(other_updates), true, Promise<Unit>());
1360 }
1361
on_get_difference(tl_object_ptr<telegram_api::updates_Difference> && difference_ptr)1362 void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Difference> &&difference_ptr) {
1363 VLOG(get_difference) << "----- END GET DIFFERENCE-----";
1364 running_get_difference_ = false;
1365
1366 if (!td_->auth_manager_->is_authorized()) {
1367 // just in case
1368 return;
1369 }
1370
1371 LOG(DEBUG) << "Result of get difference: " << to_string(difference_ptr);
1372
1373 CHECK(difference_ptr != nullptr);
1374 switch (difference_ptr->get_id()) {
1375 case telegram_api::updates_differenceEmpty::ID: {
1376 auto difference = move_tl_object_as<telegram_api::updates_differenceEmpty>(difference_ptr);
1377 set_date(difference->date_, false, "on_get_difference_empty");
1378 seq_ = difference->seq_;
1379
1380 process_pending_qts_updates();
1381 if (!pending_qts_updates_.empty()) {
1382 LOG(WARNING) << "Drop " << pending_qts_updates_.size() << " pending qts updates after receive empty difference";
1383 auto pending_qts_updates = std::move(pending_qts_updates_);
1384 pending_qts_updates_.clear();
1385
1386 for (auto &pending_update : pending_qts_updates) {
1387 auto promises = std::move(pending_update.second.promises);
1388 for (auto &promise : promises) {
1389 promise.set_value(Unit());
1390 }
1391 }
1392 }
1393
1394 process_pending_seq_updates();
1395 if (!pending_seq_updates_.empty()) {
1396 LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference";
1397 auto pending_seq_updates = std::move(pending_seq_updates_);
1398 pending_seq_updates_.clear();
1399
1400 for (auto &pending_update : pending_seq_updates) {
1401 pending_update.second.promise.set_value(Unit());
1402 }
1403 }
1404 break;
1405 }
1406 case telegram_api::updates_difference::ID: {
1407 auto difference = move_tl_object_as<telegram_api::updates_difference>(difference_ptr);
1408 VLOG(get_difference) << "In get difference receive " << difference->users_.size() << " users and "
1409 << difference->chats_.size() << " chats";
1410 td_->contacts_manager_->on_get_users(std::move(difference->users_), "updates.difference");
1411 td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference");
1412
1413 process_get_difference_updates(std::move(difference->new_messages_),
1414 std::move(difference->new_encrypted_messages_),
1415 std::move(difference->other_updates_));
1416 if (running_get_difference_) {
1417 LOG(ERROR) << "Get difference has run while processing get difference updates";
1418 break;
1419 }
1420
1421 on_get_updates_state(std::move(difference->state_), "get difference");
1422 break;
1423 }
1424 case telegram_api::updates_differenceSlice::ID: {
1425 auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
1426 bool is_pts_changed = have_update_pts_changed(difference->other_updates_);
1427 if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits<int32>::max() &&
1428 difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts() &&
1429 !is_pts_changed) {
1430 // TODO send new getDifference request and apply difference slice only after that
1431 }
1432
1433 VLOG(get_difference) << "In get difference receive " << difference->users_.size() << " users and "
1434 << difference->chats_.size() << " chats";
1435 td_->contacts_manager_->on_get_users(std::move(difference->users_), "updates.differenceSlice");
1436 td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "updates.differenceSlice");
1437
1438 process_get_difference_updates(std::move(difference->new_messages_),
1439 std::move(difference->new_encrypted_messages_),
1440 std::move(difference->other_updates_));
1441 if (running_get_difference_) {
1442 if (!is_pts_changed) {
1443 LOG(ERROR) << "Get difference has run while processing get difference updates";
1444 }
1445 break;
1446 }
1447 CHECK(!is_pts_changed);
1448
1449 auto old_pts = get_pts();
1450 auto old_date = get_date();
1451 auto old_qts = get_qts();
1452 on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
1453
1454 process_postponed_pts_updates();
1455 process_pending_qts_updates();
1456
1457 auto new_pts = get_pts();
1458 auto new_date = get_date();
1459 auto new_qts = get_qts();
1460 if (old_pts != std::numeric_limits<int32>::max() && new_date == old_date &&
1461 (new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) &&
1462 (new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) {
1463 VLOG(get_difference) << "Switch back from getDifference to update processing";
1464 break;
1465 }
1466
1467 if (new_pts != -1) { // just in case
1468 run_get_difference(true, "on updates_differenceSlice");
1469 }
1470 break;
1471 }
1472 case telegram_api::updates_differenceTooLong::ID: {
1473 LOG(ERROR) << "Receive differenceTooLong";
1474 // TODO
1475 auto difference = move_tl_object_as<telegram_api::updates_differenceTooLong>(difference_ptr);
1476 set_pts(difference->pts_, "differenceTooLong").set_value(Unit());
1477 get_difference("on updates_differenceTooLong");
1478 break;
1479 }
1480 default:
1481 UNREACHABLE();
1482 }
1483
1484 if (!running_get_difference_) {
1485 after_get_difference();
1486 }
1487 }
1488
after_get_difference()1489 void UpdatesManager::after_get_difference() {
1490 CHECK(!running_get_difference_);
1491
1492 retry_timeout_.cancel_timeout();
1493 retry_time_ = 1;
1494
1495 // cancels qts_gap_timeout_ if needed, can apply some updates received during getDifference,
1496 // but missed in getDifference
1497 process_pending_qts_updates();
1498
1499 // cancels seq_gap_timeout_ if needed, can apply some updates received during getDifference,
1500 // but missed in getDifference
1501 process_pending_seq_updates();
1502
1503 if (running_get_difference_) {
1504 return;
1505 }
1506
1507 if (!postponed_updates_.empty()) {
1508 VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed update chunks";
1509 size_t total_update_count = 0;
1510 while (!postponed_updates_.empty()) {
1511 auto it = postponed_updates_.begin();
1512 auto updates = std::move(it->second.updates);
1513 auto updates_seq_begin = it->second.seq_begin;
1514 auto updates_seq_end = it->second.seq_end;
1515 auto receive_time = it->second.receive_time;
1516 auto promise = std::move(it->second.promise);
1517 // ignore it->second.date, because it may be too old
1518 postponed_updates_.erase(it);
1519 auto update_count = updates.size();
1520 on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, receive_time, std::move(promise),
1521 "postponed updates");
1522 if (running_get_difference_) {
1523 VLOG(get_difference) << "Finish to apply postponed updates with " << postponed_updates_.size()
1524 << " updates left after applied " << total_update_count
1525 << " updates, because forced to run getDifference";
1526 return;
1527 }
1528 total_update_count += update_count;
1529 }
1530 VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates";
1531 }
1532
1533 if (!postponed_pts_updates_.empty()) { // must be before td_->messages_manager_->after_get_difference()
1534 auto postponed_updates = std::move(postponed_pts_updates_);
1535 postponed_pts_updates_.clear();
1536
1537 VLOG(get_difference) << "Begin to apply " << postponed_updates.size()
1538 << " postponed pts updates with pts = " << get_pts();
1539 for (auto &postponed_update : postponed_updates) {
1540 auto &update = postponed_update.second;
1541 add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time,
1542 std::move(update.promise), "after get difference");
1543 CHECK(!running_get_difference_);
1544 }
1545 VLOG(get_difference) << "After applying postponed pts updates have pts = " << get_pts()
1546 << ", max_pts = " << accumulated_pts_ << " and " << pending_pts_updates_.size() << " + "
1547 << postponed_pts_updates_.size() << " pending pts updates";
1548 }
1549
1550 td_->animations_manager_->after_get_difference();
1551 td_->contacts_manager_->after_get_difference();
1552 td_->inline_queries_manager_->after_get_difference();
1553 td_->messages_manager_->after_get_difference();
1554 td_->stickers_manager_->after_get_difference();
1555 send_closure_later(td_->notification_manager_actor_, &NotificationManager::after_get_difference);
1556 send_closure(G()->state_manager(), &StateManager::on_synchronized, true);
1557 }
1558
on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> && updates,int32 seq_begin,int32 seq_end,int32 date,double receive_time,Promise<Unit> && promise,const char * source)1559 void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin,
1560 int32 seq_end, int32 date, double receive_time, Promise<Unit> &&promise,
1561 const char *source) {
1562 if (get_pts() == -1) {
1563 init_state();
1564 }
1565
1566 if (!td_->auth_manager_->is_authorized()) {
1567 LOG(INFO) << "Ignore updates received before authorization or after logout";
1568 return promise.set_value(Unit());
1569 }
1570
1571 // for (auto &update : updates) {
1572 // LOG(WARNING) << "Receive update " << to_string(update.get());
1573 // }
1574
1575 if (seq_begin < 0 || seq_end < 0 || date < 0 || seq_end < seq_begin) {
1576 LOG(ERROR) << "Wrong updates parameters seq_begin = " << seq_begin << ", seq_end = " << seq_end
1577 << ", date = " << date << " from " << source;
1578 get_difference("on wrong updates in on_pending_updates");
1579 return promise.set_value(Unit());
1580 }
1581
1582 for (auto &update : updates) {
1583 if (update != nullptr) {
1584 switch (update->get_id()) {
1585 case telegram_api::updateUserTyping::ID:
1586 case telegram_api::updateChatUserTyping::ID:
1587 case telegram_api::updateChannelUserTyping::ID:
1588 case telegram_api::updateEncryptedChatTyping::ID:
1589 case telegram_api::updateLoginToken::ID:
1590 case telegram_api::updateDcOptions::ID:
1591 case telegram_api::updateConfig::ID:
1592 case telegram_api::updateServiceNotification::ID:
1593 case telegram_api::updateLangPackTooLong::ID:
1594 case telegram_api::updateLangPack::ID:
1595 short_update_date_ = date;
1596 LOG(INFO) << "Process short " << oneline(to_string(update));
1597 // don't need promise for short update
1598 downcast_call(*update, OnUpdate(this, update, Promise<Unit>()));
1599 short_update_date_ = 0;
1600 update = nullptr;
1601 break;
1602 default:
1603 break;
1604 }
1605 }
1606 }
1607
1608 bool need_postpone = running_get_difference_ /*|| string(source) != string("postponed updates")*/;
1609 if (!need_postpone) {
1610 for (auto &update : updates) {
1611 if (!is_acceptable_update(update.get())) {
1612 CHECK(update != nullptr);
1613 int32 id = update->get_id();
1614 const tl_object_ptr<telegram_api::Message> *message_ptr = nullptr;
1615 int32 pts = 0;
1616 if (id == telegram_api::updateNewChannelMessage::ID) {
1617 auto update_new_channel_message = static_cast<const telegram_api::updateNewChannelMessage *>(update.get());
1618 message_ptr = &update_new_channel_message->message_;
1619 pts = update_new_channel_message->pts_;
1620 }
1621 if (id == telegram_api::updateEditChannelMessage::ID) {
1622 auto update_edit_channel_message = static_cast<const telegram_api::updateEditChannelMessage *>(update.get());
1623 message_ptr = &update_edit_channel_message->message_;
1624 pts = update_edit_channel_message->pts_;
1625 }
1626
1627 // for channels we can try to replace unacceptable update with updateChannelTooLong
1628 if (message_ptr != nullptr) {
1629 auto dialog_id = td_->messages_manager_->get_message_dialog_id(*message_ptr);
1630 if (dialog_id.get_type() == DialogType::Channel) {
1631 auto channel_id = dialog_id.get_channel_id();
1632 if (td_->contacts_manager_->have_channel_force(channel_id)) {
1633 if (td_->messages_manager_->is_old_channel_update(dialog_id, pts)) {
1634 // the update will be ignored anyway, so there is no reason to replace it or force get_difference
1635 LOG(INFO) << "Allow an outdated unacceptable update from " << source;
1636 continue;
1637 }
1638 if ((*message_ptr)->get_id() != telegram_api::messageService::ID) {
1639 // don't replace service messages, because they can be about bot's kicking
1640 LOG(INFO) << "Replace update about new message with updateChannelTooLong in " << dialog_id;
1641 update = telegram_api::make_object<telegram_api::updateChannelTooLong>(
1642 telegram_api::updateChannelTooLong::PTS_MASK, channel_id.get(), pts);
1643 continue;
1644 }
1645 }
1646 } else {
1647 LOG(ERROR) << "Update is not from a channel: " << to_string(update);
1648 }
1649 }
1650
1651 get_difference("on unacceptable updates in on_pending_updates");
1652 return promise.set_value(Unit());
1653 }
1654 }
1655
1656 if (date > 0 && updates.size() == 1 && updates[0] != nullptr &&
1657 updates[0]->get_id() == telegram_api::updateReadHistoryOutbox::ID) {
1658 auto update = static_cast<const telegram_api::updateReadHistoryOutbox *>(updates[0].get());
1659 DialogId dialog_id(update->peer_);
1660 if (dialog_id.get_type() == DialogType::User) {
1661 auto user_id = dialog_id.get_user_id();
1662 if (user_id.is_valid()) {
1663 td_->contacts_manager_->on_update_user_local_was_online(user_id, date);
1664 }
1665 }
1666 }
1667 }
1668
1669 size_t ordinary_new_message_count = 0;
1670 size_t scheduled_new_message_count = 0;
1671 for (auto &update : updates) {
1672 if (update != nullptr) {
1673 auto constructor_id = update->get_id();
1674 if (constructor_id == telegram_api::updateNewMessage::ID ||
1675 constructor_id == telegram_api::updateNewChannelMessage::ID) {
1676 ordinary_new_message_count++;
1677 } else if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
1678 scheduled_new_message_count++;
1679 }
1680 }
1681 }
1682
1683 if (ordinary_new_message_count != 0 && scheduled_new_message_count != 0) {
1684 LOG(ERROR) << "Receive mixed message types in updates:";
1685 for (auto &update : updates) {
1686 LOG(ERROR) << "Update: " << oneline(to_string(update));
1687 }
1688 if (!running_get_difference_) {
1689 schedule_get_difference("on_get_wrong_updates");
1690 }
1691 return promise.set_value(Unit());
1692 }
1693
1694 MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"};
1695 mpas.add_promise([actor_id = create_reference(), promise = std::move(promise)](Result<Unit> &&result) mutable {
1696 send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise));
1697 });
1698 auto lock = mpas.get_promise();
1699
1700 for (auto &update : updates) {
1701 if (update != nullptr) {
1702 LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
1703 int32 id = update->get_id();
1704 if (id == telegram_api::updateMessageID::ID) {
1705 LOG(INFO) << "Receive from " << source << " " << to_string(update);
1706 auto sent_message_update = move_tl_object_as<telegram_api::updateMessageID>(update);
1707 bool success = false;
1708 if (ordinary_new_message_count != 0) {
1709 success = td_->messages_manager_->on_update_message_id(
1710 sent_message_update->random_id_, MessageId(ServerMessageId(sent_message_update->id_)), source);
1711 } else if (scheduled_new_message_count != 0) {
1712 success = td_->messages_manager_->on_update_scheduled_message_id(
1713 sent_message_update->random_id_, ScheduledServerMessageId(sent_message_update->id_), source);
1714 }
1715 if (!success) {
1716 for (auto &debug_update : updates) {
1717 LOG(ERROR) << "Update: " << oneline(to_string(debug_update));
1718 }
1719 }
1720 update = nullptr;
1721 }
1722 if (id == telegram_api::updateFolderPeers::ID) {
1723 on_update(move_tl_object_as<telegram_api::updateFolderPeers>(update), mpas.get_promise());
1724 update = nullptr;
1725 }
1726 if (id == telegram_api::updateEncryption::ID) {
1727 on_update(move_tl_object_as<telegram_api::updateEncryption>(update), mpas.get_promise());
1728 update = nullptr;
1729 }
1730 CHECK(need_postpone || !running_get_difference_);
1731 }
1732 }
1733
1734 for (auto &update : updates) {
1735 if (update != nullptr) {
1736 if (is_pts_update(update.get())) {
1737 if (running_get_difference_) {
1738 auto pts = get_update_pts(update.get());
1739 if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) {
1740 min_postponed_update_pts_ = pts;
1741 }
1742 }
1743 downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
1744 update = nullptr;
1745 } else if (is_qts_update(update.get())) {
1746 if (running_get_difference_) {
1747 auto qts = get_update_qts(update.get());
1748 if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) {
1749 min_postponed_update_qts_ = qts;
1750 }
1751 }
1752 downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
1753 update = nullptr;
1754 }
1755 }
1756 }
1757
1758 if (seq_begin == 0 && seq_end == 0) {
1759 bool have_updates = false;
1760 for (auto &update : updates) {
1761 if (update != nullptr) {
1762 have_updates = true;
1763 break;
1764 }
1765 }
1766 if (!have_updates) {
1767 LOG(INFO) << "All updates were processed";
1768 return lock.set_value(Unit());
1769 }
1770 }
1771
1772 if (need_postpone || running_get_difference_) {
1773 LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
1774 << "] with date = " << date << " from " << source;
1775 if (!need_postpone) {
1776 LOG(ERROR) << "Run get difference while applying updates from " << source;
1777 }
1778 postponed_updates_.emplace(
1779 seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
1780 return lock.set_value(Unit());
1781 }
1782
1783 if (seq_begin == 0 || seq_begin == seq_ + 1) {
1784 LOG(INFO) << "Process " << updates.size() << " updates [" << seq_begin << ", " << seq_end
1785 << "] with date = " << date << " from " << source;
1786 process_seq_updates(seq_end, date, std::move(updates), mpas.get_promise());
1787 process_pending_seq_updates();
1788 return lock.set_value(Unit());
1789 }
1790
1791 if (seq_begin <= seq_) {
1792 if (seq_ >= (1 << 30) && seq_begin < seq_ - (1 << 30)) {
1793 set_seq_gap_timeout(0.001);
1794 }
1795 if (seq_end > seq_) {
1796 LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
1797 << ", but seq = " << seq_ << " from " << source;
1798 } else {
1799 LOG(INFO) << "Receive old updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
1800 << ", but seq = " << seq_ << " from " << source;
1801 }
1802 return lock.set_value(Unit());
1803 }
1804
1805 LOG(INFO) << "Gap in seq has found. Receive " << updates.size() << " updates [" << seq_begin << ", " << seq_end
1806 << "] from " << source << ", but seq = " << seq_;
1807 LOG_IF(WARNING, pending_seq_updates_.find(seq_begin) != pending_seq_updates_.end())
1808 << "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
1809
1810 pending_seq_updates_.emplace(
1811 seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
1812 set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
1813 lock.set_value(Unit());
1814 }
1815
on_pending_updates_processed(Result<Unit> result,Promise<Unit> promise)1816 void UpdatesManager::on_pending_updates_processed(Result<Unit> result, Promise<Unit> promise) {
1817 promise.set_result(std::move(result));
1818 }
1819
add_pending_qts_update(tl_object_ptr<telegram_api::Update> && update,int32 qts,Promise<Unit> && promise)1820 void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts,
1821 Promise<Unit> &&promise) {
1822 CHECK(update != nullptr);
1823 if (qts <= 1) {
1824 LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update));
1825 schedule_get_difference("wrong qts");
1826 promise.set_value(Unit());
1827 return;
1828 }
1829
1830 int32 old_qts = get_qts();
1831 LOG(INFO) << "Process update with qts = " << qts << ", current qts = " << old_qts;
1832 if (qts < old_qts - 100001) {
1833 LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by "
1834 << oneline(to_string(update));
1835 add_qts(qts - 1).set_value(Unit());
1836 CHECK(get_qts() == qts - 1);
1837 old_qts = qts - 1;
1838 last_get_difference_qts_ = get_qts();
1839 }
1840
1841 if (qts <= old_qts) {
1842 LOG(INFO) << "Skip already applied update with qts = " << qts;
1843 promise.set_value(Unit());
1844 return;
1845 }
1846
1847 if (running_get_difference_ || (qts - 1 > old_qts && old_qts > 0)) {
1848 LOG(INFO) << "Postpone update with qts = " << qts;
1849 if (!running_get_difference_ && pending_qts_updates_.empty()) {
1850 set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
1851 }
1852 auto &pending_update = pending_qts_updates_[qts];
1853 if (pending_update.update != nullptr) {
1854 LOG(WARNING) << "Receive duplicate update with qts = " << qts;
1855 } else {
1856 pending_update.receive_time = Time::now();
1857 }
1858 pending_update.update = std::move(update);
1859 pending_update.promises.push_back(std::move(promise));
1860 return;
1861 }
1862
1863 process_qts_update(std::move(update), qts, std::move(promise));
1864 process_pending_qts_updates();
1865 }
1866
process_updates(vector<tl_object_ptr<telegram_api::Update>> && updates,bool force_apply,Promise<Unit> && promise)1867 void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply,
1868 Promise<Unit> &&promise) {
1869 tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
1870
1871 MultiPromiseActorSafe mpas{"OnProcessUpdatesMultiPromiseActor"};
1872 mpas.add_promise(std::move(promise));
1873 auto lock = mpas.get_promise();
1874
1875 /*
1876 for (auto &update : updates) {
1877 if (update != nullptr) {
1878 // TODO can't apply it here, because dialog may not be created yet
1879 // process updateReadChannelInbox before updateNewChannelMessage
1880 auto constructor_id = update->get_id();
1881 if (constructor_id == telegram_api::updateReadChannelInbox::ID) {
1882 on_update(move_tl_object_as<telegram_api::updateReadChannelInbox>(update), mpas.get_promise());
1883 }
1884 }
1885 }
1886 */
1887 for (auto &update : updates) {
1888 if (update != nullptr) {
1889 // process updateNewChannelMessage first
1890 auto constructor_id = update->get_id();
1891 if (constructor_id == telegram_api::updateNewChannelMessage::ID) {
1892 on_update(move_tl_object_as<telegram_api::updateNewChannelMessage>(update), mpas.get_promise());
1893 continue;
1894 }
1895
1896 // process updateNewScheduledMessage first
1897 if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
1898 on_update(move_tl_object_as<telegram_api::updateNewScheduledMessage>(update), mpas.get_promise());
1899 continue;
1900 }
1901
1902 // updateGroupCallConnection must be processed before updateGroupCall
1903 if (constructor_id == telegram_api::updateGroupCallConnection::ID) {
1904 on_update(move_tl_object_as<telegram_api::updateGroupCallConnection>(update), mpas.get_promise());
1905 continue;
1906 }
1907
1908 // updatePtsChanged forces get difference, so process it last
1909 if (constructor_id == telegram_api::updatePtsChanged::ID) {
1910 update_pts_changed = move_tl_object_as<telegram_api::updatePtsChanged>(update);
1911 continue;
1912 }
1913 }
1914 }
1915 if (force_apply) {
1916 for (auto &update : updates) {
1917 if (update != nullptr) {
1918 if (is_pts_update(update.get())) {
1919 auto constructor_id = update->get_id();
1920 if (constructor_id == telegram_api::updateWebPage::ID) {
1921 auto update_web_page = move_tl_object_as<telegram_api::updateWebPage>(update);
1922 td_->web_pages_manager_->on_get_web_page(std::move(update_web_page->webpage_), DialogId());
1923 continue;
1924 }
1925
1926 CHECK(constructor_id != telegram_api::updateFolderPeers::ID);
1927
1928 if (constructor_id == telegram_api::updateReadHistoryInbox::ID) {
1929 static_cast<telegram_api::updateReadHistoryInbox *>(update.get())->still_unread_count_ = -1;
1930 }
1931
1932 process_pts_update(std::move(update));
1933 } else if (is_qts_update(update.get())) {
1934 process_qts_update(std::move(update), 0, mpas.get_promise());
1935 } else if (update->get_id() == telegram_api::updateChannelTooLong::ID) {
1936 td_->messages_manager_->on_update_channel_too_long(
1937 move_tl_object_as<telegram_api::updateChannelTooLong>(update), true);
1938 }
1939 }
1940 }
1941 }
1942 for (auto &update : updates) {
1943 if (update != nullptr) {
1944 LOG(INFO) << "Process update " << to_string(update);
1945 downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
1946 CHECK(!running_get_difference_);
1947 }
1948 }
1949 if (update_pts_changed != nullptr) {
1950 on_update(std::move(update_pts_changed), mpas.get_promise());
1951 }
1952 lock.set_value(Unit());
1953 }
1954
process_pts_update(tl_object_ptr<telegram_api::Update> && update)1955 void UpdatesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
1956 CHECK(update != nullptr);
1957
1958 // TODO need to save all updates that can change result of running queries not associated with pts (for example
1959 // getHistory) and apply the updates to results of the queries
1960
1961 if (!check_pts_update(update)) {
1962 LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update));
1963 return;
1964 }
1965
1966 // must be called only during getDifference
1967 CHECK(pending_pts_updates_.empty());
1968 CHECK(accumulated_pts_ == -1);
1969
1970 td_->messages_manager_->process_pts_update(std::move(update));
1971 }
1972
add_pending_pts_update(tl_object_ptr<telegram_api::Update> && update,int32 new_pts,int32 pts_count,double receive_time,Promise<Unit> && promise,const char * source)1973 void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
1974 int32 pts_count, double receive_time, Promise<Unit> &&promise,
1975 const char *source) {
1976 // do not try to run getDifference from this function
1977 CHECK(update != nullptr);
1978 CHECK(source != nullptr);
1979 LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
1980 if (pts_count < 0 || new_pts <= pts_count) {
1981 LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
1982 << ": " << oneline(to_string(update));
1983 return promise.set_value(Unit());
1984 }
1985
1986 // TODO need to save all updates that can change result of running queries not associated with pts (for example
1987 // getHistory) and apply them to result of this queries
1988
1989 if (!check_pts_update(update)) {
1990 LOG(ERROR) << "Receive wrong pts update from " << source << ": " << oneline(to_string(update));
1991 return promise.set_value(Unit());
1992 }
1993
1994 if (DROP_PTS_UPDATES) {
1995 set_pts_gap_timeout(1.0);
1996 return promise.set_value(Unit());
1997 }
1998
1999 int32 old_pts = get_pts();
2000 if (new_pts < old_pts - 99 && Slice(source) != "after get difference") {
2001 bool need_restore_pts = new_pts < old_pts - 19999;
2002 auto now = Time::now();
2003 if (now > last_pts_jump_warning_time_ + 1 && (need_restore_pts || now < last_pts_jump_warning_time_ + 5)) {
2004 LOG(ERROR) << "Restore pts after delete_first_messages from " << old_pts << " to " << new_pts
2005 << " is disabled, pts_count = " << pts_count << ", update is from " << source << ": "
2006 << oneline(to_string(update));
2007 last_pts_jump_warning_time_ = now;
2008 }
2009 if (need_restore_pts) {
2010 set_pts_gap_timeout(0.001);
2011
2012 /*
2013 LOG(WARNING) << "Restore pts after delete_first_messages";
2014 set_pts(new_pts - 1, "restore pts after delete_first_messages");
2015 old_pts = get_pts();
2016 CHECK(old_pts == new_pts - 1);
2017 */
2018 }
2019 }
2020
2021 if (new_pts <= old_pts || (old_pts >= 1 && new_pts - (1 << 30) > old_pts)) {
2022 td_->messages_manager_->skip_old_pending_pts_update(std::move(update), new_pts, old_pts, pts_count, source);
2023 return promise.set_value(Unit());
2024 }
2025
2026 if (running_get_difference_ || !postponed_pts_updates_.empty()) {
2027 LOG(INFO) << "Save pending update got while running getDifference from " << source;
2028 postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
2029 return;
2030 }
2031
2032 if (old_pts > new_pts - pts_count) {
2033 LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
2034 << "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
2035 << source << " = " << oneline(to_string(update));
2036 postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
2037 set_pts_gap_timeout(0.001);
2038 return;
2039 }
2040
2041 accumulated_pts_count_ += pts_count;
2042 if (new_pts > accumulated_pts_) {
2043 accumulated_pts_ = new_pts;
2044 }
2045
2046 if (old_pts > accumulated_pts_ - accumulated_pts_count_) {
2047 LOG(WARNING) << "Have old_pts (= " << old_pts << ") + accumulated_pts_count (= " << accumulated_pts_count_
2048 << ") > accumulated_pts (= " << accumulated_pts_ << "). new_pts = " << new_pts
2049 << ", pts_count = " << pts_count << ". Logged in "
2050 << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
2051 << oneline(to_string(update));
2052 postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
2053 set_pts_gap_timeout(0.001);
2054 return;
2055 }
2056
2057 LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update);
2058
2059 if (pending_pts_updates_.empty() && old_pts == accumulated_pts_ - accumulated_pts_count_ &&
2060 !pts_gap_timeout_.has_timeout()) {
2061 if (pts_count > 0) {
2062 td_->messages_manager_->process_pts_update(std::move(update));
2063
2064 set_pts(accumulated_pts_, "process pending updates fast path")
2065 .set_value(Unit()); // TODO can't set until data are really stored on persistent storage
2066 accumulated_pts_count_ = 0;
2067 accumulated_pts_ = -1;
2068 }
2069 promise.set_value(Unit());
2070 return;
2071 }
2072
2073 pending_pts_updates_.emplace(
2074 new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise)));
2075
2076 if (old_pts < accumulated_pts_ - accumulated_pts_count_) {
2077 if (old_pts == new_pts - pts_count) {
2078 // can't apply all updates, but can apply this and probably some other updates
2079 process_pending_pts_updates();
2080 } else {
2081 set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
2082 }
2083 return;
2084 }
2085
2086 CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_);
2087 process_all_pending_pts_updates();
2088 }
2089
postpone_pts_update(tl_object_ptr<telegram_api::Update> && update,int32 pts,int32 pts_count,double receive_time,Promise<Unit> && promise)2090 void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
2091 double receive_time, Promise<Unit> &&promise) {
2092 postponed_pts_updates_.emplace(pts,
2093 PendingPtsUpdate(std::move(update), pts, pts_count, receive_time, std::move(promise)));
2094 }
2095
process_seq_updates(int32 seq_end,int32 date,vector<tl_object_ptr<telegram_api::Update>> && updates,Promise<Unit> && promise)2096 void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
2097 vector<tl_object_ptr<telegram_api::Update>> &&updates,
2098 Promise<Unit> &&promise) {
2099 string serialized_updates = PSTRING() << "process_seq_updates [seq_ = " << seq_ << ", seq_end = " << seq_end << "]: ";
2100 // TODO remove after bugs will be fixed
2101 for (auto &update : updates) {
2102 if (update != nullptr) {
2103 serialized_updates += oneline(to_string(update));
2104 }
2105 }
2106 process_updates(std::move(updates), false, std::move(promise));
2107 if (seq_end) {
2108 seq_ = seq_end;
2109 }
2110 if (date && seq_end) {
2111 set_date(date, true, std::move(serialized_updates));
2112 }
2113 }
2114
process_qts_update(tl_object_ptr<telegram_api::Update> && update_ptr,int32 qts,Promise<Unit> && promise)2115 void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts,
2116 Promise<Unit> &&promise) {
2117 LOG(DEBUG) << "Process " << to_string(update_ptr);
2118 if (last_get_difference_qts_ < qts - FORCED_GET_DIFFERENCE_PTS_DIFF) {
2119 if (last_get_difference_qts_ != 0) {
2120 schedule_get_difference("rare qts getDifference");
2121 }
2122 last_get_difference_qts_ = qts;
2123 }
2124 switch (update_ptr->get_id()) {
2125 case telegram_api::updateNewEncryptedMessage::ID: {
2126 auto update = move_tl_object_as<telegram_api::updateNewEncryptedMessage>(update_ptr);
2127 send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_),
2128 add_qts(qts));
2129 break;
2130 }
2131 case telegram_api::updateMessagePollVote::ID: {
2132 auto update = move_tl_object_as<telegram_api::updateMessagePollVote>(update_ptr);
2133 td_->poll_manager_->on_get_poll_vote(PollId(update->poll_id_), UserId(update->user_id_),
2134 std::move(update->options_));
2135 add_qts(qts).set_value(Unit());
2136 break;
2137 }
2138 case telegram_api::updateBotStopped::ID: {
2139 auto update = move_tl_object_as<telegram_api::updateBotStopped>(update_ptr);
2140 td_->contacts_manager_->on_update_bot_stopped(UserId(update->user_id_), update->date_, update->stopped_);
2141 add_qts(qts).set_value(Unit());
2142 break;
2143 }
2144 case telegram_api::updateChatParticipant::ID: {
2145 auto update = move_tl_object_as<telegram_api::updateChatParticipant>(update_ptr);
2146 td_->contacts_manager_->on_update_chat_participant(ChatId(update->chat_id_), UserId(update->actor_id_),
2147 update->date_, DialogInviteLink(std::move(update->invite_)),
2148 std::move(update->prev_participant_),
2149 std::move(update->new_participant_));
2150 add_qts(qts).set_value(Unit());
2151 break;
2152 }
2153 case telegram_api::updateChannelParticipant::ID: {
2154 auto update = move_tl_object_as<telegram_api::updateChannelParticipant>(update_ptr);
2155 td_->contacts_manager_->on_update_channel_participant(ChannelId(update->channel_id_), UserId(update->actor_id_),
2156 update->date_, DialogInviteLink(std::move(update->invite_)),
2157 std::move(update->prev_participant_),
2158 std::move(update->new_participant_));
2159 add_qts(qts).set_value(Unit());
2160 break;
2161 }
2162 case telegram_api::updateBotChatInviteRequester::ID: {
2163 auto update = move_tl_object_as<telegram_api::updateBotChatInviteRequester>(update_ptr);
2164 td_->contacts_manager_->on_update_chat_invite_requester(DialogId(update->peer_), UserId(update->user_id_),
2165 std::move(update->about_), update->date_,
2166 DialogInviteLink(std::move(update->invite_)));
2167 add_qts(qts).set_value(Unit());
2168 break;
2169 }
2170 default:
2171 UNREACHABLE();
2172 break;
2173 }
2174 promise.set_value(Unit());
2175 }
2176
process_all_pending_pts_updates()2177 void UpdatesManager::process_all_pending_pts_updates() {
2178 auto begin_time = Time::now();
2179 for (auto &update : pending_pts_updates_) {
2180 td_->messages_manager_->process_pts_update(std::move(update.second.update));
2181 update.second.promise.set_value(Unit());
2182 }
2183
2184 if (last_pts_gap_time_ != 0) {
2185 auto begin_diff = begin_time - last_pts_gap_time_;
2186 auto diff = Time::now() - last_pts_gap_time_;
2187 last_pts_gap_time_ = 0;
2188 if (diff > 0.1) {
2189 VLOG(get_difference) << "Gap in pts from " << accumulated_pts_ - accumulated_pts_count_ << " to "
2190 << accumulated_pts_ << " has been filled in " << begin_diff << '-' << diff << " seconds";
2191 }
2192 }
2193
2194 set_pts(accumulated_pts_, "process_all_pending_pts_updates")
2195 .set_value(Unit()); // TODO can't set until updates are stored on persistent storage
2196 drop_all_pending_pts_updates();
2197 }
2198
drop_all_pending_pts_updates()2199 void UpdatesManager::drop_all_pending_pts_updates() {
2200 accumulated_pts_count_ = 0;
2201 accumulated_pts_ = -1;
2202 pts_gap_timeout_.cancel_timeout();
2203 pending_pts_updates_.clear();
2204 }
2205
process_postponed_pts_updates()2206 void UpdatesManager::process_postponed_pts_updates() {
2207 if (postponed_pts_updates_.empty()) {
2208 return;
2209 }
2210
2211 auto initial_pts = get_pts();
2212 auto old_pts = initial_pts;
2213 int32 skipped_update_count = 0;
2214 int32 applied_update_count = 0;
2215 auto update_it = postponed_pts_updates_.begin();
2216 while (update_it != postponed_pts_updates_.end()) {
2217 auto new_pts = update_it->second.pts;
2218 auto pts_count = update_it->second.pts_count;
2219 if (new_pts <= old_pts || (old_pts >= 1 && new_pts - (1 << 30) > old_pts)) {
2220 skipped_update_count++;
2221 td_->messages_manager_->skip_old_pending_pts_update(std::move(update_it->second.update), new_pts, old_pts,
2222 pts_count, "process_postponed_pts_updates");
2223 update_it->second.promise.set_value(Unit());
2224 update_it = postponed_pts_updates_.erase(update_it);
2225 continue;
2226 }
2227
2228 auto last_update_it = update_it;
2229 for (int32 i = 1; true; i++) {
2230 ++last_update_it;
2231 if (old_pts == new_pts - pts_count) {
2232 // the updates can be applied
2233 break;
2234 }
2235 if (old_pts > new_pts - pts_count || last_update_it == postponed_pts_updates_.end() ||
2236 i == GAP_TIMEOUT_UPDATE_COUNT) {
2237 // the updates can't be applied
2238 VLOG(get_difference) << "Can't apply " << i << " next postponed updates with pts " << update_it->second.pts
2239 << '-' << new_pts << ", because their pts_count is " << pts_count
2240 << " instead of expected " << new_pts - old_pts;
2241 last_update_it = update_it;
2242 break;
2243 }
2244
2245 new_pts = last_update_it->second.pts;
2246 pts_count += last_update_it->second.pts_count;
2247 }
2248
2249 if (last_update_it == update_it) {
2250 // the updates will be applied or skipped later
2251 break;
2252 }
2253 CHECK(old_pts == new_pts - pts_count);
2254
2255 while (update_it != last_update_it) {
2256 if (update_it->second.pts_count > 0) {
2257 applied_update_count++;
2258 td_->messages_manager_->process_pts_update(std::move(update_it->second.update));
2259 }
2260 update_it->second.promise.set_value(Unit());
2261 update_it = postponed_pts_updates_.erase(update_it);
2262 }
2263 old_pts = new_pts;
2264 }
2265 if (old_pts != initial_pts) {
2266 set_pts(old_pts, "process_postponed_pts_updates")
2267 .set_value(Unit()); // TODO can't set until data are really stored on persistent storage
2268 }
2269 CHECK(!running_get_difference_);
2270 if (skipped_update_count + applied_update_count > 0) {
2271 VLOG(get_difference) << "Pts has changed from " << initial_pts << " to " << old_pts << " after skipping "
2272 << skipped_update_count << ", applying " << applied_update_count << " and keeping "
2273 << postponed_pts_updates_.size() << " postponed updates";
2274 }
2275 }
2276
process_pending_pts_updates()2277 void UpdatesManager::process_pending_pts_updates() {
2278 if (pending_pts_updates_.empty()) {
2279 return;
2280 }
2281
2282 bool processed_pending_update = false;
2283 while (!pending_pts_updates_.empty()) {
2284 auto update_it = pending_pts_updates_.begin();
2285 auto &update = update_it->second;
2286 if (get_pts() != update.pts - update.pts_count) {
2287 // the updates will be applied or skipped later
2288 break;
2289 }
2290
2291 processed_pending_update = true;
2292 if (update.pts_count > 0) {
2293 td_->messages_manager_->process_pts_update(std::move(update.update));
2294 set_pts(update.pts, "process_pending_pts_updates")
2295 .set_value(Unit()); // TODO can't set until data are really stored on persistent storage
2296
2297 if (accumulated_pts_ != -1) {
2298 CHECK(update.pts <= accumulated_pts_);
2299 CHECK(accumulated_pts_count_ >= update.pts_count);
2300 accumulated_pts_count_ -= update.pts_count;
2301 }
2302 }
2303 update.promise.set_value(Unit());
2304 pending_pts_updates_.erase(update_it);
2305 }
2306 if (processed_pending_update) {
2307 pts_gap_timeout_.cancel_timeout();
2308 }
2309 if (!pending_pts_updates_.empty()) {
2310 // if still have a gap, reset timeout
2311 auto update_it = pending_pts_updates_.begin();
2312 double receive_time = update_it->second.receive_time;
2313 for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
2314 if (++update_it == pending_pts_updates_.end()) {
2315 break;
2316 }
2317 receive_time = min(receive_time, update_it->second.receive_time);
2318 }
2319 set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
2320 }
2321 }
2322
process_pending_seq_updates()2323 void UpdatesManager::process_pending_seq_updates() {
2324 if (!pending_seq_updates_.empty()) {
2325 LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates";
2326 // must not return, because in case of seq overflow there are no pending seq updates
2327 }
2328
2329 bool processed_pending_update = false;
2330 while (!pending_seq_updates_.empty() && !running_get_difference_) {
2331 auto update_it = pending_seq_updates_.begin();
2332 auto &update = update_it->second;
2333 auto seq_begin = update.seq_begin;
2334 if (seq_begin - 1 > seq_ && seq_begin - (1 << 30) <= seq_) {
2335 // the updates will be applied later
2336 break;
2337 }
2338
2339 processed_pending_update = true;
2340 auto seq_end = update.seq_end;
2341 if (seq_begin - 1 == seq_) {
2342 process_seq_updates(seq_end, update.date, std::move(update.updates), std::move(update.promise));
2343 } else {
2344 // old update
2345 CHECK(seq_begin != 0);
2346 if (seq_begin <= seq_ && seq_ < seq_end) {
2347 LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
2348 << ", but seq = " << seq_;
2349 }
2350 update.promise.set_value(Unit());
2351 }
2352 pending_seq_updates_.erase(update_it);
2353 }
2354 if (pending_seq_updates_.empty() || processed_pending_update) {
2355 seq_gap_timeout_.cancel_timeout();
2356 }
2357 if (!pending_seq_updates_.empty()) {
2358 // if still have a gap, reset timeout
2359 auto update_it = pending_seq_updates_.begin();
2360 double receive_time = update_it->second.receive_time;
2361 for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
2362 if (++update_it == pending_seq_updates_.end()) {
2363 break;
2364 }
2365 receive_time = min(receive_time, update_it->second.receive_time);
2366 }
2367 set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
2368 }
2369 }
2370
process_pending_qts_updates()2371 void UpdatesManager::process_pending_qts_updates() {
2372 if (pending_qts_updates_.empty()) {
2373 return;
2374 }
2375
2376 LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates";
2377 bool processed_pending_update = false;
2378 while (!pending_qts_updates_.empty()) {
2379 CHECK(!running_get_difference_);
2380 auto update_it = pending_qts_updates_.begin();
2381 auto qts = update_it->first;
2382 auto old_qts = get_qts();
2383 if (qts - 1 > old_qts && qts - (1 << 30) <= old_qts) {
2384 // the update will be applied later
2385 break;
2386 }
2387 auto promise = PromiseCreator::lambda([promises = std::move(update_it->second.promises)](Unit) mutable {
2388 for (auto &promise : promises) {
2389 promise.set_value(Unit());
2390 }
2391 });
2392 processed_pending_update = true;
2393 if (qts == old_qts + 1) {
2394 process_qts_update(std::move(update_it->second.update), qts, std::move(promise));
2395 } else {
2396 promise.set_value(Unit());
2397 }
2398 pending_qts_updates_.erase(update_it);
2399 }
2400
2401 if (processed_pending_update) {
2402 qts_gap_timeout_.cancel_timeout();
2403 }
2404 if (!pending_qts_updates_.empty()) {
2405 // if still have a gap, reset timeout
2406 auto update_it = pending_qts_updates_.begin();
2407 double receive_time = update_it->second.receive_time;
2408 for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
2409 if (++update_it == pending_qts_updates_.end()) {
2410 break;
2411 }
2412 receive_time = min(receive_time, update_it->second.receive_time);
2413 }
2414 set_qts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
2415 }
2416 CHECK(!running_get_difference_);
2417 }
2418
set_pts_gap_timeout(double timeout)2419 void UpdatesManager::set_pts_gap_timeout(double timeout) {
2420 if (!pts_gap_timeout_.has_timeout() || timeout < pts_gap_timeout_.get_timeout()) {
2421 pts_gap_timeout_.set_callback(std::move(fill_pts_gap));
2422 pts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
2423 pts_gap_timeout_.set_timeout_in(timeout);
2424 last_pts_gap_time_ = Time::now();
2425 }
2426 }
2427
set_seq_gap_timeout(double timeout)2428 void UpdatesManager::set_seq_gap_timeout(double timeout) {
2429 if (!seq_gap_timeout_.has_timeout() || timeout < seq_gap_timeout_.get_timeout()) {
2430 seq_gap_timeout_.set_callback(std::move(fill_seq_gap));
2431 seq_gap_timeout_.set_callback_data(static_cast<void *>(td_));
2432 seq_gap_timeout_.set_timeout_in(timeout);
2433 }
2434 }
2435
set_qts_gap_timeout(double timeout)2436 void UpdatesManager::set_qts_gap_timeout(double timeout) {
2437 if (!qts_gap_timeout_.has_timeout() || timeout < qts_gap_timeout_.get_timeout()) {
2438 qts_gap_timeout_.set_callback(std::move(fill_qts_gap));
2439 qts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
2440 qts_gap_timeout_.set_timeout_in(timeout);
2441 }
2442 }
2443
on_pending_update(tl_object_ptr<telegram_api::Update> update,int32 seq,Promise<Unit> && promise,const char * source)2444 void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, Promise<Unit> &&promise,
2445 const char *source) {
2446 vector<tl_object_ptr<telegram_api::Update>> updates;
2447 updates.push_back(std::move(update));
2448 on_pending_updates(std::move(updates), seq, seq, 0, Time::now(), std::move(promise), source);
2449 }
2450
on_update(tl_object_ptr<telegram_api::updateNewMessage> update,Promise<Unit> && promise)2451 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> update, Promise<Unit> &&promise) {
2452 int new_pts = update->pts_;
2453 int pts_count = update->pts_count_;
2454 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateNewMessage");
2455 }
2456
on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update,Promise<Unit> && promise)2457 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, Promise<Unit> &&promise) {
2458 DialogId dialog_id = MessagesManager::get_message_dialog_id(update->message_);
2459 int new_pts = update->pts_;
2460 int pts_count = update->pts_count_;
2461 td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
2462 std::move(promise), "updateNewChannelMessage");
2463 }
2464
on_update(tl_object_ptr<telegram_api::updateMessageID> update,Promise<Unit> && promise)2465 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessageID> update, Promise<Unit> &&promise) {
2466 LOG(ERROR) << "Receive not in getDifference and not in on_pending_updates " << to_string(update);
2467 }
2468
on_update(tl_object_ptr<telegram_api::updateReadMessagesContents> update,Promise<Unit> && promise)2469 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesContents> update,
2470 Promise<Unit> &&promise) {
2471 int new_pts = update->pts_;
2472 int pts_count = update->pts_count_;
2473 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
2474 "updateReadMessagesContents");
2475 }
2476
on_update(tl_object_ptr<telegram_api::updateEditMessage> update,Promise<Unit> && promise)2477 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, Promise<Unit> &&promise) {
2478 int new_pts = update->pts_;
2479 int pts_count = update->pts_count_;
2480 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateEditMessage");
2481 }
2482
on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update,Promise<Unit> && promise)2483 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, Promise<Unit> &&promise) {
2484 int new_pts = update->pts_;
2485 int pts_count = update->pts_count_;
2486 if (update->messages_.empty()) {
2487 add_pending_pts_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Time::now(), Promise<Unit>(),
2488 "updateDeleteMessages");
2489 promise.set_value(Unit());
2490 } else {
2491 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
2492 "updateDeleteMessages");
2493 }
2494 }
2495
on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update,Promise<Unit> && promise)2496 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, Promise<Unit> &&promise) {
2497 int new_pts = update->pts_;
2498 int pts_count = update->pts_count_;
2499 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
2500 "updateReadHistoryInbox");
2501 }
2502
on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update,Promise<Unit> && promise)2503 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, Promise<Unit> &&promise) {
2504 int new_pts = update->pts_;
2505 int pts_count = update->pts_count_;
2506 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
2507 "updateReadHistoryOutbox");
2508 }
2509
on_update(tl_object_ptr<telegram_api::updateServiceNotification> update,Promise<Unit> && promise)2510 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, Promise<Unit> &&promise) {
2511 td_->messages_manager_->on_update_service_notification(std::move(update), true, Promise<Unit>());
2512 promise.set_value(Unit());
2513 }
2514
on_update(tl_object_ptr<telegram_api::updateChat> update,Promise<Unit> && promise)2515 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChat> update, Promise<Unit> &&promise) {
2516 td_->messages_manager_->on_dialog_info_full_invalidated(DialogId(ChatId(update->chat_id_)));
2517 promise.set_value(Unit());
2518 }
2519
on_update(tl_object_ptr<telegram_api::updateReadChannelInbox> update,Promise<Unit> && promise)2520 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelInbox> update, Promise<Unit> &&promise) {
2521 td_->messages_manager_->on_update_read_channel_inbox(std::move(update));
2522 promise.set_value(Unit());
2523 }
2524
on_update(tl_object_ptr<telegram_api::updateReadChannelOutbox> update,Promise<Unit> && promise)2525 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelOutbox> update, Promise<Unit> &&promise) {
2526 td_->messages_manager_->on_update_read_channel_outbox(std::move(update));
2527 promise.set_value(Unit());
2528 }
2529
on_update(tl_object_ptr<telegram_api::updateChannelReadMessagesContents> update,Promise<Unit> && promise)2530 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelReadMessagesContents> update,
2531 Promise<Unit> &&promise) {
2532 td_->messages_manager_->on_update_read_channel_messages_contents(std::move(update));
2533 promise.set_value(Unit());
2534 }
2535
on_update(tl_object_ptr<telegram_api::updateChannelTooLong> update,Promise<Unit> && promise)2536 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelTooLong> update, Promise<Unit> &&promise) {
2537 td_->messages_manager_->on_update_channel_too_long(std::move(update), false);
2538 promise.set_value(Unit());
2539 }
2540
on_update(tl_object_ptr<telegram_api::updateChannel> update,Promise<Unit> && promise)2541 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannel> update, Promise<Unit> &&promise) {
2542 td_->contacts_manager_->invalidate_channel_full(ChannelId(update->channel_id_), false);
2543 promise.set_value(Unit());
2544 }
2545
on_update(tl_object_ptr<telegram_api::updateEditChannelMessage> update,Promise<Unit> && promise)2546 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditChannelMessage> update, Promise<Unit> &&promise) {
2547 DialogId dialog_id = MessagesManager::get_message_dialog_id(update->message_);
2548 int new_pts = update->pts_;
2549 int pts_count = update->pts_count_;
2550 td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
2551 std::move(promise), "updateEditChannelMessage");
2552 }
2553
on_update(tl_object_ptr<telegram_api::updateDeleteChannelMessages> update,Promise<Unit> && promise)2554 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteChannelMessages> update,
2555 Promise<Unit> &&promise) {
2556 DialogId dialog_id(ChannelId(update->channel_id_));
2557 int new_pts = update->pts_;
2558 int pts_count = update->pts_count_;
2559 td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
2560 std::move(promise), "updateDeleteChannelMessages");
2561 }
2562
on_update(tl_object_ptr<telegram_api::updateChannelMessageViews> update,Promise<Unit> && promise)2563 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelMessageViews> update, Promise<Unit> &&promise) {
2564 ChannelId channel_id(update->channel_id_);
2565 if (!channel_id.is_valid()) {
2566 LOG(ERROR) << "Receive invalid " << channel_id;
2567 } else {
2568 DialogId dialog_id(channel_id);
2569 td_->messages_manager_->on_update_message_view_count({dialog_id, MessageId(ServerMessageId(update->id_))},
2570 update->views_);
2571 }
2572 promise.set_value(Unit());
2573 }
2574
on_update(tl_object_ptr<telegram_api::updateChannelMessageForwards> update,Promise<Unit> && promise)2575 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelMessageForwards> update,
2576 Promise<Unit> &&promise) {
2577 ChannelId channel_id(update->channel_id_);
2578 if (!channel_id.is_valid()) {
2579 LOG(ERROR) << "Receive invalid " << channel_id;
2580 } else {
2581 DialogId dialog_id(channel_id);
2582 td_->messages_manager_->on_update_message_forward_count({dialog_id, MessageId(ServerMessageId(update->id_))},
2583 update->forwards_);
2584 }
2585 promise.set_value(Unit());
2586 }
2587
on_update(tl_object_ptr<telegram_api::updateChannelAvailableMessages> update,Promise<Unit> && promise)2588 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelAvailableMessages> update,
2589 Promise<Unit> &&promise) {
2590 td_->messages_manager_->on_update_channel_max_unavailable_message_id(
2591 ChannelId(update->channel_id_), MessageId(ServerMessageId(update->available_min_id_)));
2592 promise.set_value(Unit());
2593 }
2594
on_update(tl_object_ptr<telegram_api::updateReadChannelDiscussionInbox> update,Promise<Unit> && promise)2595 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelDiscussionInbox> update,
2596 Promise<Unit> &&promise) {
2597 td_->messages_manager_->on_update_read_message_comments(
2598 DialogId(ChannelId(update->channel_id_)), MessageId(ServerMessageId(update->top_msg_id_)), MessageId(),
2599 MessageId(ServerMessageId(update->read_max_id_)), MessageId());
2600 if ((update->flags_ & telegram_api::updateReadChannelDiscussionInbox::BROADCAST_ID_MASK) != 0) {
2601 td_->messages_manager_->on_update_read_message_comments(
2602 DialogId(ChannelId(update->broadcast_id_)), MessageId(ServerMessageId(update->broadcast_post_)), MessageId(),
2603 MessageId(ServerMessageId(update->read_max_id_)), MessageId());
2604 }
2605 promise.set_value(Unit());
2606 }
2607
on_update(tl_object_ptr<telegram_api::updateReadChannelDiscussionOutbox> update,Promise<Unit> && promise)2608 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelDiscussionOutbox> update,
2609 Promise<Unit> &&promise) {
2610 td_->messages_manager_->on_update_read_message_comments(
2611 DialogId(ChannelId(update->channel_id_)), MessageId(ServerMessageId(update->top_msg_id_)), MessageId(),
2612 MessageId(), MessageId(ServerMessageId(update->read_max_id_)));
2613 promise.set_value(Unit());
2614 }
2615
on_update(tl_object_ptr<telegram_api::updatePinnedMessages> update,Promise<Unit> && promise)2616 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages> update, Promise<Unit> &&promise) {
2617 int new_pts = update->pts_;
2618 int pts_count = update->pts_count_;
2619 add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
2620 "updatePinnedMessages");
2621 }
2622
on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update,Promise<Unit> && promise)2623 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update,
2624 Promise<Unit> &&promise) {
2625 DialogId dialog_id(ChannelId(update->channel_id_));
2626 int new_pts = update->pts_;
2627 int pts_count = update->pts_count_;
2628 td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
2629 std::move(promise), "updatePinnedChannelMessages");
2630 }
2631
on_update(tl_object_ptr<telegram_api::updateNotifySettings> update,Promise<Unit> && promise)2632 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNotifySettings> update, Promise<Unit> &&promise) {
2633 switch (update->peer_->get_id()) {
2634 case telegram_api::notifyPeer::ID: {
2635 DialogId dialog_id(static_cast<const telegram_api::notifyPeer *>(update->peer_.get())->peer_);
2636 if (dialog_id.is_valid()) {
2637 td_->messages_manager_->on_update_dialog_notify_settings(dialog_id, std::move(update->notify_settings_),
2638 "updateNotifySettings");
2639 } else {
2640 LOG(ERROR) << "Receive wrong " << to_string(update);
2641 }
2642 break;
2643 }
2644 case telegram_api::notifyUsers::ID:
2645 td_->messages_manager_->on_update_scope_notify_settings(NotificationSettingsScope::Private,
2646 std::move(update->notify_settings_));
2647 break;
2648 case telegram_api::notifyChats::ID:
2649 td_->messages_manager_->on_update_scope_notify_settings(NotificationSettingsScope::Group,
2650 std::move(update->notify_settings_));
2651 break;
2652 case telegram_api::notifyBroadcasts::ID:
2653 td_->messages_manager_->on_update_scope_notify_settings(NotificationSettingsScope::Channel,
2654 std::move(update->notify_settings_));
2655 break;
2656 default:
2657 UNREACHABLE();
2658 }
2659 promise.set_value(Unit());
2660 }
2661
on_update(tl_object_ptr<telegram_api::updatePeerSettings> update,Promise<Unit> && promise)2662 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerSettings> update, Promise<Unit> &&promise) {
2663 td_->messages_manager_->on_get_peer_settings(DialogId(update->peer_), std::move(update->settings_));
2664 promise.set_value(Unit());
2665 }
2666
on_update(tl_object_ptr<telegram_api::updatePeerHistoryTTL> update,Promise<Unit> && promise)2667 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerHistoryTTL> update, Promise<Unit> &&promise) {
2668 MessageTtlSetting message_ttl_setting;
2669 if ((update->flags_ & telegram_api::updatePeerHistoryTTL::TTL_PERIOD_MASK) != 0) {
2670 message_ttl_setting = MessageTtlSetting(update->ttl_period_);
2671 }
2672 td_->messages_manager_->on_update_dialog_message_ttl_setting(DialogId(update->peer_), message_ttl_setting);
2673 promise.set_value(Unit());
2674 }
2675
on_update(tl_object_ptr<telegram_api::updatePeerLocated> update,Promise<Unit> && promise)2676 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerLocated> update, Promise<Unit> &&promise) {
2677 td_->contacts_manager_->on_update_peer_located(std::move(update->peers_), true);
2678 promise.set_value(Unit());
2679 }
2680
on_update(tl_object_ptr<telegram_api::updateWebPage> update,Promise<Unit> && promise)2681 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update, Promise<Unit> &&promise) {
2682 td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
2683 add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Time::now(), Promise<Unit>(),
2684 "updateWebPage");
2685 promise.set_value(Unit());
2686 }
2687
on_update(tl_object_ptr<telegram_api::updateChannelWebPage> update,Promise<Unit> && promise)2688 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelWebPage> update, Promise<Unit> &&promise) {
2689 td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
2690 DialogId dialog_id(ChannelId(update->channel_id_));
2691 td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object<dummyUpdate>(), update->pts_,
2692 update->pts_count_, Promise<Unit>(), "updateChannelWebPage");
2693 promise.set_value(Unit());
2694 }
2695
on_update(tl_object_ptr<telegram_api::updateFolderPeers> update,Promise<Unit> && promise)2696 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> update, Promise<Unit> &&promise) {
2697 for (auto &folder_peer : update->folder_peers_) {
2698 DialogId dialog_id(folder_peer->peer_);
2699 FolderId folder_id(folder_peer->folder_id_);
2700 td_->messages_manager_->on_update_dialog_folder_id(dialog_id, folder_id);
2701 }
2702
2703 if (update->pts_ > 0) {
2704 add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Time::now(),
2705 Promise<Unit>(), "updateFolderPeers");
2706 }
2707 promise.set_value(Unit());
2708 }
2709
get_short_update_date() const2710 int32 UpdatesManager::get_short_update_date() const {
2711 int32 now = G()->unix_time();
2712 if (short_update_date_ > 0) {
2713 return min(short_update_date_, now);
2714 }
2715 return now;
2716 }
2717
have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> & updates)2718 bool UpdatesManager::have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates) {
2719 for (auto &update : updates) {
2720 CHECK(update != nullptr);
2721 if (update->get_id() == telegram_api::updatePtsChanged::ID) {
2722 return true;
2723 }
2724 }
2725 return false;
2726 }
2727
check_pts_update_dialog_id(DialogId dialog_id)2728 bool UpdatesManager::check_pts_update_dialog_id(DialogId dialog_id) {
2729 switch (dialog_id.get_type()) {
2730 case DialogType::User:
2731 case DialogType::Chat:
2732 return true;
2733 case DialogType::Channel:
2734 case DialogType::SecretChat:
2735 case DialogType::None:
2736 return false;
2737 default:
2738 UNREACHABLE();
2739 return false;
2740 }
2741 }
2742
check_pts_update(const tl_object_ptr<telegram_api::Update> & update)2743 bool UpdatesManager::check_pts_update(const tl_object_ptr<telegram_api::Update> &update) {
2744 CHECK(update != nullptr);
2745 switch (update->get_id()) {
2746 case dummyUpdate::ID:
2747 case updateSentMessage::ID:
2748 case telegram_api::updateReadMessagesContents::ID:
2749 case telegram_api::updateDeleteMessages::ID:
2750 return true;
2751 case telegram_api::updateNewMessage::ID: {
2752 auto update_new_message = static_cast<const telegram_api::updateNewMessage *>(update.get());
2753 return check_pts_update_dialog_id(MessagesManager::get_message_dialog_id(update_new_message->message_));
2754 }
2755 case telegram_api::updateReadHistoryInbox::ID: {
2756 auto update_read_history_inbox = static_cast<const telegram_api::updateReadHistoryInbox *>(update.get());
2757 return check_pts_update_dialog_id(DialogId(update_read_history_inbox->peer_));
2758 }
2759 case telegram_api::updateReadHistoryOutbox::ID: {
2760 auto update_read_history_outbox = static_cast<const telegram_api::updateReadHistoryOutbox *>(update.get());
2761 return check_pts_update_dialog_id(DialogId(update_read_history_outbox->peer_));
2762 }
2763 case telegram_api::updateEditMessage::ID: {
2764 auto update_edit_message = static_cast<const telegram_api::updateEditMessage *>(update.get());
2765 return check_pts_update_dialog_id(MessagesManager::get_message_dialog_id(update_edit_message->message_));
2766 }
2767 case telegram_api::updatePinnedMessages::ID: {
2768 auto update_pinned_messages = static_cast<const telegram_api::updatePinnedMessages *>(update.get());
2769 return check_pts_update_dialog_id(DialogId(update_pinned_messages->peer_));
2770 }
2771 default:
2772 return false;
2773 }
2774 }
2775
is_pts_update(const telegram_api::Update * update)2776 bool UpdatesManager::is_pts_update(const telegram_api::Update *update) {
2777 switch (update->get_id()) {
2778 case telegram_api::updateNewMessage::ID:
2779 case telegram_api::updateReadMessagesContents::ID:
2780 case telegram_api::updateEditMessage::ID:
2781 case telegram_api::updateDeleteMessages::ID:
2782 case telegram_api::updateReadHistoryInbox::ID:
2783 case telegram_api::updateReadHistoryOutbox::ID:
2784 case telegram_api::updateWebPage::ID:
2785 case telegram_api::updatePinnedMessages::ID:
2786 case telegram_api::updateFolderPeers::ID:
2787 return true;
2788 default:
2789 return false;
2790 }
2791 }
2792
get_update_pts(const telegram_api::Update * update)2793 int32 UpdatesManager::get_update_pts(const telegram_api::Update *update) {
2794 switch (update->get_id()) {
2795 case telegram_api::updateNewMessage::ID:
2796 return static_cast<const telegram_api::updateNewMessage *>(update)->pts_;
2797 case telegram_api::updateReadMessagesContents::ID:
2798 return static_cast<const telegram_api::updateReadMessagesContents *>(update)->pts_;
2799 case telegram_api::updateEditMessage::ID:
2800 return static_cast<const telegram_api::updateEditMessage *>(update)->pts_;
2801 case telegram_api::updateDeleteMessages::ID:
2802 return static_cast<const telegram_api::updateDeleteMessages *>(update)->pts_;
2803 case telegram_api::updateReadHistoryInbox::ID:
2804 return static_cast<const telegram_api::updateReadHistoryInbox *>(update)->pts_;
2805 case telegram_api::updateReadHistoryOutbox::ID:
2806 return static_cast<const telegram_api::updateReadHistoryOutbox *>(update)->pts_;
2807 case telegram_api::updateWebPage::ID:
2808 return static_cast<const telegram_api::updateWebPage *>(update)->pts_;
2809 case telegram_api::updatePinnedMessages::ID:
2810 return static_cast<const telegram_api::updatePinnedMessages *>(update)->pts_;
2811 case telegram_api::updateFolderPeers::ID:
2812 return static_cast<const telegram_api::updateFolderPeers *>(update)->pts_;
2813 default:
2814 return 0;
2815 }
2816 }
2817
is_qts_update(const telegram_api::Update * update)2818 bool UpdatesManager::is_qts_update(const telegram_api::Update *update) {
2819 switch (update->get_id()) {
2820 case telegram_api::updateNewEncryptedMessage::ID:
2821 case telegram_api::updateMessagePollVote::ID:
2822 case telegram_api::updateBotStopped::ID:
2823 case telegram_api::updateChatParticipant::ID:
2824 case telegram_api::updateChannelParticipant::ID:
2825 case telegram_api::updateBotChatInviteRequester::ID:
2826 return true;
2827 default:
2828 return false;
2829 }
2830 }
2831
get_update_qts(const telegram_api::Update * update)2832 int32 UpdatesManager::get_update_qts(const telegram_api::Update *update) {
2833 switch (update->get_id()) {
2834 case telegram_api::updateNewEncryptedMessage::ID:
2835 return static_cast<const telegram_api::updateNewEncryptedMessage *>(update)->qts_;
2836 case telegram_api::updateMessagePollVote::ID:
2837 return static_cast<const telegram_api::updateMessagePollVote *>(update)->qts_;
2838 case telegram_api::updateBotStopped::ID:
2839 return static_cast<const telegram_api::updateBotStopped *>(update)->qts_;
2840 case telegram_api::updateChatParticipant::ID:
2841 return static_cast<const telegram_api::updateChatParticipant *>(update)->qts_;
2842 case telegram_api::updateChannelParticipant::ID:
2843 return static_cast<const telegram_api::updateChannelParticipant *>(update)->qts_;
2844 case telegram_api::updateBotChatInviteRequester::ID:
2845 return static_cast<const telegram_api::updateBotChatInviteRequester *>(update)->qts_;
2846 default:
2847 return 0;
2848 }
2849 }
2850
on_update(tl_object_ptr<telegram_api::updateUserTyping> update,Promise<Unit> && promise)2851 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserTyping> update, Promise<Unit> &&promise) {
2852 DialogId dialog_id(UserId(update->user_id_));
2853 td_->messages_manager_->on_dialog_action(dialog_id, MessageId(), dialog_id, DialogAction(std::move(update->action_)),
2854 get_short_update_date());
2855 promise.set_value(Unit());
2856 }
2857
on_update(tl_object_ptr<telegram_api::updateChatUserTyping> update,Promise<Unit> && promise)2858 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatUserTyping> update, Promise<Unit> &&promise) {
2859 td_->messages_manager_->on_dialog_action(DialogId(ChatId(update->chat_id_)), MessageId(), DialogId(update->from_id_),
2860 DialogAction(std::move(update->action_)), get_short_update_date());
2861 promise.set_value(Unit());
2862 }
2863
on_update(tl_object_ptr<telegram_api::updateChannelUserTyping> update,Promise<Unit> && promise)2864 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelUserTyping> update, Promise<Unit> &&promise) {
2865 MessageId top_thread_message_id;
2866 if ((update->flags_ & telegram_api::updateChannelUserTyping::TOP_MSG_ID_MASK) != 0) {
2867 top_thread_message_id = MessageId(ServerMessageId(update->top_msg_id_));
2868 }
2869 td_->messages_manager_->on_dialog_action(DialogId(ChannelId(update->channel_id_)), top_thread_message_id,
2870 DialogId(update->from_id_), DialogAction(std::move(update->action_)),
2871 get_short_update_date());
2872 promise.set_value(Unit());
2873 }
2874
on_update(tl_object_ptr<telegram_api::updateEncryptedChatTyping> update,Promise<Unit> && promise)2875 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedChatTyping> update, Promise<Unit> &&promise) {
2876 SecretChatId secret_chat_id(update->chat_id_);
2877 UserId user_id = td_->contacts_manager_->get_secret_chat_user_id(secret_chat_id);
2878 td_->messages_manager_->on_dialog_action(DialogId(secret_chat_id), MessageId(), DialogId(user_id),
2879 DialogAction::get_typing_action(), get_short_update_date());
2880 promise.set_value(Unit());
2881 }
2882
on_update(tl_object_ptr<telegram_api::updateUserStatus> update,Promise<Unit> && promise)2883 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserStatus> update, Promise<Unit> &&promise) {
2884 td_->contacts_manager_->on_update_user_online(UserId(update->user_id_), std::move(update->status_));
2885 promise.set_value(Unit());
2886 }
2887
on_update(tl_object_ptr<telegram_api::updateUserName> update,Promise<Unit> && promise)2888 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserName> update, Promise<Unit> &&promise) {
2889 td_->contacts_manager_->on_update_user_name(UserId(update->user_id_), std::move(update->first_name_),
2890 std::move(update->last_name_), std::move(update->username_));
2891 promise.set_value(Unit());
2892 }
2893
on_update(tl_object_ptr<telegram_api::updateUserPhone> update,Promise<Unit> && promise)2894 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserPhone> update, Promise<Unit> &&promise) {
2895 td_->contacts_manager_->on_update_user_phone_number(UserId(update->user_id_), std::move(update->phone_));
2896 promise.set_value(Unit());
2897 }
2898
on_update(tl_object_ptr<telegram_api::updateUserPhoto> update,Promise<Unit> && promise)2899 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserPhoto> update, Promise<Unit> &&promise) {
2900 // TODO update->previous_, update->date_
2901 td_->contacts_manager_->on_update_user_photo(UserId(update->user_id_), std::move(update->photo_));
2902 promise.set_value(Unit());
2903 }
2904
on_update(tl_object_ptr<telegram_api::updatePeerBlocked> update,Promise<Unit> && promise)2905 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerBlocked> update, Promise<Unit> &&promise) {
2906 td_->messages_manager_->on_update_dialog_is_blocked(DialogId(update->peer_id_), update->blocked_);
2907 promise.set_value(Unit());
2908 }
2909
on_update(tl_object_ptr<telegram_api::updateBotCommands> update,Promise<Unit> && promise)2910 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotCommands> update, Promise<Unit> &&promise) {
2911 td_->contacts_manager_->on_update_bot_commands(DialogId(update->peer_), UserId(update->bot_id_),
2912 std::move(update->commands_));
2913 promise.set_value(Unit());
2914 }
2915
on_update(tl_object_ptr<telegram_api::updateChatParticipants> update,Promise<Unit> && promise)2916 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatParticipants> update, Promise<Unit> &&promise) {
2917 td_->contacts_manager_->on_get_chat_participants(std::move(update->participants_), true);
2918 promise.set_value(Unit());
2919 }
2920
on_update(tl_object_ptr<telegram_api::updateChatParticipantAdd> update,Promise<Unit> && promise)2921 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatParticipantAdd> update, Promise<Unit> &&promise) {
2922 td_->contacts_manager_->on_update_chat_add_user(ChatId(update->chat_id_), UserId(update->inviter_id_),
2923 UserId(update->user_id_), update->date_, update->version_);
2924 promise.set_value(Unit());
2925 }
2926
on_update(tl_object_ptr<telegram_api::updateChatParticipantAdmin> update,Promise<Unit> && promise)2927 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatParticipantAdmin> update,
2928 Promise<Unit> &&promise) {
2929 td_->contacts_manager_->on_update_chat_edit_administrator(ChatId(update->chat_id_), UserId(update->user_id_),
2930 update->is_admin_, update->version_);
2931 promise.set_value(Unit());
2932 }
2933
on_update(tl_object_ptr<telegram_api::updateChatParticipantDelete> update,Promise<Unit> && promise)2934 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatParticipantDelete> update,
2935 Promise<Unit> &&promise) {
2936 td_->contacts_manager_->on_update_chat_delete_user(ChatId(update->chat_id_), UserId(update->user_id_),
2937 update->version_);
2938 promise.set_value(Unit());
2939 }
2940
on_update(tl_object_ptr<telegram_api::updateChatDefaultBannedRights> update,Promise<Unit> && promise)2941 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatDefaultBannedRights> update,
2942 Promise<Unit> &&promise) {
2943 DialogId dialog_id(update->peer_);
2944 RestrictedRights permissions = get_restricted_rights(std::move(update->default_banned_rights_));
2945 auto version = update->version_;
2946 switch (dialog_id.get_type()) {
2947 case DialogType::Chat:
2948 td_->contacts_manager_->on_update_chat_default_permissions(dialog_id.get_chat_id(), permissions, version);
2949 break;
2950 case DialogType::Channel:
2951 LOG_IF(ERROR, version != 0) << "Receive version " << version << " in " << dialog_id;
2952 td_->contacts_manager_->on_update_channel_default_permissions(dialog_id.get_channel_id(), permissions);
2953 break;
2954 case DialogType::None:
2955 case DialogType::User:
2956 case DialogType::SecretChat:
2957 default:
2958 LOG(ERROR) << "Receive updateChatDefaultBannedRights in " << dialog_id;
2959 break;
2960 }
2961 promise.set_value(Unit());
2962 }
2963
on_update(tl_object_ptr<telegram_api::updateDraftMessage> update,Promise<Unit> && promise)2964 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDraftMessage> update, Promise<Unit> &&promise) {
2965 td_->messages_manager_->on_update_dialog_draft_message(DialogId(update->peer_), std::move(update->draft_));
2966 promise.set_value(Unit());
2967 }
2968
on_update(tl_object_ptr<telegram_api::updateDialogPinned> update,Promise<Unit> && promise)2969 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDialogPinned> update, Promise<Unit> &&promise) {
2970 td_->messages_manager_->on_update_dialog_is_pinned(FolderId(update->folder_id_), DialogId(update->peer_),
2971 update->pinned_);
2972 promise.set_value(Unit());
2973 }
2974
on_update(tl_object_ptr<telegram_api::updatePinnedDialogs> update,Promise<Unit> && promise)2975 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedDialogs> update, Promise<Unit> &&promise) {
2976 FolderId folder_id(update->flags_ & telegram_api::updatePinnedDialogs::FOLDER_ID_MASK ? update->folder_id_ : 0);
2977 td_->messages_manager_->on_update_pinned_dialogs(folder_id); // TODO use update->order_
2978 promise.set_value(Unit());
2979 }
2980
on_update(tl_object_ptr<telegram_api::updateDialogUnreadMark> update,Promise<Unit> && promise)2981 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDialogUnreadMark> update, Promise<Unit> &&promise) {
2982 td_->messages_manager_->on_update_dialog_is_marked_as_unread(DialogId(update->peer_), update->unread_);
2983 promise.set_value(Unit());
2984 }
2985
on_update(tl_object_ptr<telegram_api::updateDialogFilter> update,Promise<Unit> && promise)2986 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDialogFilter> update, Promise<Unit> &&promise) {
2987 td_->messages_manager_->on_update_dialog_filters();
2988 promise.set_value(Unit());
2989 }
2990
on_update(tl_object_ptr<telegram_api::updateDialogFilters> update,Promise<Unit> && promise)2991 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDialogFilters> update, Promise<Unit> &&promise) {
2992 td_->messages_manager_->on_update_dialog_filters();
2993 promise.set_value(Unit());
2994 }
2995
on_update(tl_object_ptr<telegram_api::updateDialogFilterOrder> update,Promise<Unit> && promise)2996 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDialogFilterOrder> update, Promise<Unit> &&promise) {
2997 td_->messages_manager_->on_update_dialog_filters();
2998 promise.set_value(Unit());
2999 }
3000
on_update(tl_object_ptr<telegram_api::updateDcOptions> update,Promise<Unit> && promise)3001 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDcOptions> update, Promise<Unit> &&promise) {
3002 send_closure(G()->config_manager(), &ConfigManager::on_dc_options_update, DcOptions(update->dc_options_));
3003 promise.set_value(Unit());
3004 }
3005
on_update(tl_object_ptr<telegram_api::updateBotInlineQuery> update,Promise<Unit> && promise)3006 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotInlineQuery> update, Promise<Unit> &&promise) {
3007 td_->inline_queries_manager_->on_new_query(update->query_id_, UserId(update->user_id_), Location(update->geo_),
3008 std::move(update->peer_type_), update->query_, update->offset_);
3009 promise.set_value(Unit());
3010 }
3011
on_update(tl_object_ptr<telegram_api::updateBotInlineSend> update,Promise<Unit> && promise)3012 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotInlineSend> update, Promise<Unit> &&promise) {
3013 td_->inline_queries_manager_->on_chosen_result(UserId(update->user_id_), Location(update->geo_), update->query_,
3014 update->id_, std::move(update->msg_id_));
3015 promise.set_value(Unit());
3016 }
3017
on_update(tl_object_ptr<telegram_api::updateBotCallbackQuery> update,Promise<Unit> && promise)3018 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotCallbackQuery> update, Promise<Unit> &&promise) {
3019 td_->callback_queries_manager_->on_new_query(update->flags_, update->query_id_, UserId(update->user_id_),
3020 DialogId(update->peer_), MessageId(ServerMessageId(update->msg_id_)),
3021 std::move(update->data_), update->chat_instance_,
3022 std::move(update->game_short_name_));
3023 promise.set_value(Unit());
3024 }
3025
on_update(tl_object_ptr<telegram_api::updateInlineBotCallbackQuery> update,Promise<Unit> && promise)3026 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateInlineBotCallbackQuery> update,
3027 Promise<Unit> &&promise) {
3028 td_->callback_queries_manager_->on_new_inline_query(update->flags_, update->query_id_, UserId(update->user_id_),
3029 std::move(update->msg_id_), std::move(update->data_),
3030 update->chat_instance_, std::move(update->game_short_name_));
3031 promise.set_value(Unit());
3032 }
3033
on_update(tl_object_ptr<telegram_api::updateFavedStickers> update,Promise<Unit> && promise)3034 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFavedStickers> update, Promise<Unit> &&promise) {
3035 td_->stickers_manager_->reload_favorite_stickers(true);
3036 promise.set_value(Unit());
3037 }
3038
on_update(tl_object_ptr<telegram_api::updateSavedGifs> update,Promise<Unit> && promise)3039 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateSavedGifs> update, Promise<Unit> &&promise) {
3040 td_->animations_manager_->reload_saved_animations(true);
3041 promise.set_value(Unit());
3042 }
3043
on_update(tl_object_ptr<telegram_api::updateConfig> update,Promise<Unit> && promise)3044 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateConfig> update, Promise<Unit> &&promise) {
3045 send_closure(td_->config_manager_, &ConfigManager::request_config);
3046 promise.set_value(Unit());
3047 }
3048
on_update(tl_object_ptr<telegram_api::updatePtsChanged> update,Promise<Unit> && promise)3049 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePtsChanged> update, Promise<Unit> &&promise) {
3050 set_pts(std::numeric_limits<int32>::max(), "updatePtsChanged").set_value(Unit());
3051 promise.set_value(Unit());
3052 }
3053
on_update(tl_object_ptr<telegram_api::updateEncryption> update,Promise<Unit> && promise)3054 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryption> update, Promise<Unit> &&promise) {
3055 send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_update_chat, std::move(update));
3056 promise.set_value(Unit());
3057 }
3058
on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update,Promise<Unit> && promise)3059 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, Promise<Unit> &&promise) {
3060 auto qts = update->qts_;
3061 add_pending_qts_update(std::move(update), qts, std::move(promise));
3062 }
3063
on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update,Promise<Unit> && promise)3064 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update,
3065 Promise<Unit> &&promise) {
3066 td_->messages_manager_->read_secret_chat_outbox(SecretChatId(update->chat_id_), update->max_date_, update->date_);
3067 promise.set_value(Unit());
3068 }
3069
on_update(tl_object_ptr<telegram_api::updatePrivacy> update,Promise<Unit> && promise)3070 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePrivacy> update, Promise<Unit> &&promise) {
3071 send_closure(td_->privacy_manager_, &PrivacyManager::update_privacy, std::move(update));
3072 promise.set_value(Unit());
3073 }
3074
on_update(tl_object_ptr<telegram_api::updateNewStickerSet> update,Promise<Unit> && promise)3075 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewStickerSet> update, Promise<Unit> &&promise) {
3076 td_->stickers_manager_->on_get_messages_sticker_set(StickerSetId(), std::move(update->stickerset_), true,
3077 "updateNewStickerSet");
3078 promise.set_value(Unit());
3079 }
3080
on_update(tl_object_ptr<telegram_api::updateStickerSets> update,Promise<Unit> && promise)3081 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateStickerSets> update, Promise<Unit> &&promise) {
3082 td_->stickers_manager_->on_update_sticker_sets();
3083 promise.set_value(Unit());
3084 }
3085
on_update(tl_object_ptr<telegram_api::updateStickerSetsOrder> update,Promise<Unit> && promise)3086 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateStickerSetsOrder> update, Promise<Unit> &&promise) {
3087 td_->stickers_manager_->on_update_sticker_sets_order(update->masks_,
3088 StickersManager::convert_sticker_set_ids(update->order_));
3089 promise.set_value(Unit());
3090 }
3091
on_update(tl_object_ptr<telegram_api::updateReadFeaturedStickers> update,Promise<Unit> && promise)3092 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadFeaturedStickers> update,
3093 Promise<Unit> &&promise) {
3094 td_->stickers_manager_->reload_featured_sticker_sets(true);
3095 promise.set_value(Unit());
3096 }
3097
on_update(tl_object_ptr<telegram_api::updateRecentStickers> update,Promise<Unit> && promise)3098 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateRecentStickers> update, Promise<Unit> &&promise) {
3099 td_->stickers_manager_->reload_recent_stickers(false, true);
3100 td_->stickers_manager_->reload_recent_stickers(true, true);
3101 promise.set_value(Unit());
3102 }
3103
on_update(tl_object_ptr<telegram_api::updateBotShippingQuery> update,Promise<Unit> && promise)3104 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotShippingQuery> update, Promise<Unit> &&promise) {
3105 UserId user_id(update->user_id_);
3106 if (!user_id.is_valid()) {
3107 LOG(ERROR) << "Receive shipping query from invalid " << user_id;
3108 } else {
3109 CHECK(update->shipping_address_ != nullptr);
3110
3111 send_closure(
3112 G()->td(), &Td::send_update,
3113 make_tl_object<td_api::updateNewShippingQuery>(
3114 update->query_id_, td_->contacts_manager_->get_user_id_object(user_id, "updateNewShippingQuery"),
3115 update->payload_.as_slice().str(),
3116 get_address_object(get_address(std::move(update->shipping_address_))))); // TODO use convert_address
3117 }
3118 promise.set_value(Unit());
3119 }
3120
on_update(tl_object_ptr<telegram_api::updateBotPrecheckoutQuery> update,Promise<Unit> && promise)3121 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotPrecheckoutQuery> update, Promise<Unit> &&promise) {
3122 UserId user_id(update->user_id_);
3123 if (!user_id.is_valid()) {
3124 LOG(ERROR) << "Receive pre-checkout query from invalid " << user_id;
3125 } else {
3126 send_closure(
3127 G()->td(), &Td::send_update,
3128 make_tl_object<td_api::updateNewPreCheckoutQuery>(
3129 update->query_id_, td_->contacts_manager_->get_user_id_object(user_id, "updateNewPreCheckoutQuery"),
3130 update->currency_, update->total_amount_, update->payload_.as_slice().str(), update->shipping_option_id_,
3131 get_order_info_object(get_order_info(std::move(update->info_)))));
3132 }
3133 promise.set_value(Unit());
3134 }
3135
on_update(tl_object_ptr<telegram_api::updateBotWebhookJSON> update,Promise<Unit> && promise)3136 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotWebhookJSON> update, Promise<Unit> &&promise) {
3137 send_closure(G()->td(), &Td::send_update, make_tl_object<td_api::updateNewCustomEvent>(update->data_->data_));
3138 promise.set_value(Unit());
3139 }
3140
on_update(tl_object_ptr<telegram_api::updateBotWebhookJSONQuery> update,Promise<Unit> && promise)3141 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotWebhookJSONQuery> update, Promise<Unit> &&promise) {
3142 send_closure(G()->td(), &Td::send_update,
3143 make_tl_object<td_api::updateNewCustomQuery>(update->query_id_, update->data_->data_, update->timeout_));
3144 promise.set_value(Unit());
3145 }
3146
on_update(tl_object_ptr<telegram_api::updatePhoneCall> update,Promise<Unit> && promise)3147 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePhoneCall> update, Promise<Unit> &&promise) {
3148 send_closure(G()->call_manager(), &CallManager::update_call, std::move(update));
3149 promise.set_value(Unit());
3150 }
3151
on_update(tl_object_ptr<telegram_api::updatePhoneCallSignalingData> update,Promise<Unit> && promise)3152 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePhoneCallSignalingData> update,
3153 Promise<Unit> &&promise) {
3154 send_closure(G()->call_manager(), &CallManager::update_call_signaling_data, update->phone_call_id_,
3155 update->data_.as_slice().str());
3156 promise.set_value(Unit());
3157 }
3158
on_update(tl_object_ptr<telegram_api::updateGroupCallConnection> update,Promise<Unit> && promise)3159 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateGroupCallConnection> update, Promise<Unit> &&promise) {
3160 if (update->presentation_) {
3161 LOG(ERROR) << "Receive unexpected updateGroupCallConnection";
3162 } else {
3163 send_closure(G()->group_call_manager(), &GroupCallManager::on_update_group_call_connection,
3164 std::move(update->params_->data_));
3165 }
3166 promise.set_value(Unit());
3167 }
3168
on_update(tl_object_ptr<telegram_api::updateGroupCall> update,Promise<Unit> && promise)3169 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateGroupCall> update, Promise<Unit> &&promise) {
3170 DialogId dialog_id(ChatId(update->chat_id_));
3171 if (!td_->messages_manager_->have_dialog_force(dialog_id, "updateGroupCall")) {
3172 dialog_id = DialogId(ChannelId(update->chat_id_));
3173 if (!td_->messages_manager_->have_dialog_force(dialog_id, "updateGroupCall")) {
3174 dialog_id = DialogId();
3175 }
3176 }
3177 send_closure(G()->group_call_manager(), &GroupCallManager::on_update_group_call, std::move(update->call_), dialog_id);
3178 promise.set_value(Unit());
3179 }
3180
on_update(tl_object_ptr<telegram_api::updateGroupCallParticipants> update,Promise<Unit> && promise)3181 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateGroupCallParticipants> update,
3182 Promise<Unit> &&promise) {
3183 send_closure(G()->group_call_manager(), &GroupCallManager::on_update_group_call_participants,
3184 InputGroupCallId(update->call_), std::move(update->participants_), update->version_, false);
3185 promise.set_value(Unit());
3186 }
3187
on_update(tl_object_ptr<telegram_api::updateContactsReset> update,Promise<Unit> && promise)3188 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateContactsReset> update, Promise<Unit> &&promise) {
3189 td_->contacts_manager_->on_update_contacts_reset();
3190 promise.set_value(Unit());
3191 }
3192
on_update(tl_object_ptr<telegram_api::updateLangPackTooLong> update,Promise<Unit> && promise)3193 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateLangPackTooLong> update, Promise<Unit> &&promise) {
3194 send_closure(G()->language_pack_manager(), &LanguagePackManager::on_language_pack_too_long,
3195 std::move(update->lang_code_));
3196 promise.set_value(Unit());
3197 }
3198
on_update(tl_object_ptr<telegram_api::updateLangPack> update,Promise<Unit> && promise)3199 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateLangPack> update, Promise<Unit> &&promise) {
3200 send_closure(G()->language_pack_manager(), &LanguagePackManager::on_update_language_pack,
3201 std::move(update->difference_));
3202 promise.set_value(Unit());
3203 }
3204
on_update(tl_object_ptr<telegram_api::updateGeoLiveViewed> update,Promise<Unit> && promise)3205 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateGeoLiveViewed> update, Promise<Unit> &&promise) {
3206 td_->messages_manager_->on_update_live_location_viewed(
3207 {DialogId(update->peer_), MessageId(ServerMessageId(update->msg_id_))});
3208 promise.set_value(Unit());
3209 }
3210
on_update(tl_object_ptr<telegram_api::updateMessagePoll> update,Promise<Unit> && promise)3211 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessagePoll> update, Promise<Unit> &&promise) {
3212 td_->poll_manager_->on_get_poll(PollId(update->poll_id_), std::move(update->poll_), std::move(update->results_));
3213 promise.set_value(Unit());
3214 }
3215
on_update(tl_object_ptr<telegram_api::updateMessagePollVote> update,Promise<Unit> && promise)3216 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessagePollVote> update, Promise<Unit> &&promise) {
3217 auto qts = update->qts_;
3218 add_pending_qts_update(std::move(update), qts, std::move(promise));
3219 }
3220
on_update(tl_object_ptr<telegram_api::updateNewScheduledMessage> update,Promise<Unit> && promise)3221 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewScheduledMessage> update, Promise<Unit> &&promise) {
3222 td_->messages_manager_->on_get_message(std::move(update->message_), true, false, true, true, true,
3223 "updateNewScheduledMessage");
3224 promise.set_value(Unit());
3225 }
3226
on_update(tl_object_ptr<telegram_api::updateDeleteScheduledMessages> update,Promise<Unit> && promise)3227 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteScheduledMessages> update,
3228 Promise<Unit> &&promise) {
3229 vector<ScheduledServerMessageId> message_ids = transform(update->messages_, [](int32 scheduled_server_message_id) {
3230 return ScheduledServerMessageId(scheduled_server_message_id);
3231 });
3232
3233 td_->messages_manager_->on_update_delete_scheduled_messages(DialogId(update->peer_), std::move(message_ids));
3234 promise.set_value(Unit());
3235 }
3236
on_update(tl_object_ptr<telegram_api::updateLoginToken> update,Promise<Unit> && promise)3237 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateLoginToken> update, Promise<Unit> &&promise) {
3238 LOG(INFO) << "Ignore updateLoginToken after authorization";
3239 promise.set_value(Unit());
3240 }
3241
on_update(tl_object_ptr<telegram_api::updateBotStopped> update,Promise<Unit> && promise)3242 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotStopped> update, Promise<Unit> &&promise) {
3243 auto qts = update->qts_;
3244 add_pending_qts_update(std::move(update), qts, std::move(promise));
3245 }
3246
on_update(tl_object_ptr<telegram_api::updateChatParticipant> update,Promise<Unit> && promise)3247 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChatParticipant> update, Promise<Unit> &&promise) {
3248 auto qts = update->qts_;
3249 add_pending_qts_update(std::move(update), qts, std::move(promise));
3250 }
3251
on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update,Promise<Unit> && promise)3252 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update, Promise<Unit> &&promise) {
3253 auto qts = update->qts_;
3254 add_pending_qts_update(std::move(update), qts, std::move(promise));
3255 }
3256
on_update(tl_object_ptr<telegram_api::updateBotChatInviteRequester> update,Promise<Unit> && promise)3257 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateBotChatInviteRequester> update,
3258 Promise<Unit> &&promise) {
3259 auto qts = update->qts_;
3260 add_pending_qts_update(std::move(update), qts, std::move(promise));
3261 }
3262
on_update(tl_object_ptr<telegram_api::updateTheme> update,Promise<Unit> && promise)3263 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateTheme> update, Promise<Unit> &&promise) {
3264 td_->theme_manager_->on_update_theme(std::move(update->theme_), std::move(promise));
3265 }
3266
on_update(tl_object_ptr<telegram_api::updatePendingJoinRequests> update,Promise<Unit> && promise)3267 void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePendingJoinRequests> update, Promise<Unit> &&promise) {
3268 td_->messages_manager_->on_update_dialog_pending_join_requests(DialogId(update->peer_), update->requests_pending_,
3269 std::move(update->recent_requesters_));
3270 promise.set_value(Unit());
3271 }
3272
3273 // unsupported updates
3274
3275 } // namespace td
3276