1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/SequenceDispatcher.h"
8 
9 #include "td/telegram/Global.h"
10 #include "td/telegram/net/NetQueryDispatcher.h"
11 
12 #include "td/actor/PromiseFuture.h"
13 
14 #include "td/utils/format.h"
15 #include "td/utils/logging.h"
16 #include "td/utils/misc.h"
17 #include "td/utils/SliceBuilder.h"
18 #include "td/utils/Status.h"
19 
20 #include <limits>
21 
22 namespace td {
23 
24 /*** Sequence Dispatcher ***/
25 // Sends queries with invokeAfter.
26 //
27 // Each query has three states Start/Wait/Finish
28 //
29 // finish_i points to the first not Finish query.
30 // next_i points to the next query to be sent.
31 //
32 // Each query has generation of InvokeAfter chain.
33 //
34 // When query is send, its generation is set to current chain generation.
35 //
36 // When query is failed and its generation is equals to current generation we must start new chain:
37 // increment the generation and set next_i to finish_i.
38 //
39 // last_sent_i points to the last sent query in current chain.
40 //
send_with_callback(NetQueryPtr query,ActorShared<NetQueryCallback> callback)41 void SequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
42   cancel_timeout();
43   query->debug("Waiting at SequenceDispatcher");
44   auto query_weak_ref = query.get_weak();
45   data_.push_back(Data{State::Start, std::move(query_weak_ref), std::move(query), std::move(callback), 0, 0.0, 0.0});
46   loop();
47 }
48 
check_timeout(Data & data)49 void SequenceDispatcher::check_timeout(Data &data) {
50   if (data.state_ != State::Start) {
51     return;
52   }
53   data.query_->total_timeout_ += data.total_timeout_;
54   data.total_timeout_ = 0;
55   if (data.query_->total_timeout_ > data.query_->total_timeout_limit_) {
56     LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
57                  << data.query_->total_timeout_ << " is greater than total_timeout_limit "
58                  << data.query_->total_timeout_limit_;
59     data.query_->set_error(Status::Error(
60         429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
61     data.state_ = State::Dummy;
62     try_resend_query(data, std::move(data.query_));
63   }
64 }
65 
try_resend_query(Data & data,NetQueryPtr query)66 void SequenceDispatcher::try_resend_query(Data &data, NetQueryPtr query) {
67   size_t pos = &data - &data_[0];
68   CHECK(pos < data_.size());
69   CHECK(data.state_ == State::Dummy);
70   data.state_ = State::Wait;
71   wait_cnt_++;
72   auto token = pos + id_offset_;
73   // TODO: if query is ok, use NetQueryCallback::on_result
74   auto promise = PromiseCreator::lambda([&, self = actor_shared(this, token)](NetQueryPtr query) mutable {
75     if (!query.empty()) {
76       send_closure(std::move(self), &SequenceDispatcher::on_resend_ok, std::move(query));
77     } else {
78       send_closure(std::move(self), &SequenceDispatcher::on_resend_error);
79     }
80   });
81   send_closure(data.callback_, &NetQueryCallback::on_result_resendable, std::move(query), std::move(promise));
82 }
83 
data_from_token()84 SequenceDispatcher::Data &SequenceDispatcher::data_from_token() {
85   auto token = narrow_cast<size_t>(get_link_token());
86   auto pos = token - id_offset_;
87   CHECK(pos < data_.size());
88   auto &data = data_[pos];
89   CHECK(data.state_ == State::Wait);
90   CHECK(wait_cnt_ > 0);
91   wait_cnt_--;
92   data.state_ = State::Dummy;
93   return data;
94 }
95 
on_resend_ok(NetQueryPtr query)96 void SequenceDispatcher::on_resend_ok(NetQueryPtr query) {
97   auto &data = data_from_token();
98   data.query_ = std::move(query);
99   do_resend(data);
100   loop();
101 }
102 
on_resend_error()103 void SequenceDispatcher::on_resend_error() {
104   auto &data = data_from_token();
105   do_finish(data);
106   loop();
107 }
108 
do_resend(Data & data)109 void SequenceDispatcher::do_resend(Data &data) {
110   CHECK(data.state_ == State::Dummy);
111   data.state_ = State::Start;
112   if (data.generation_ == generation_) {
113     next_i_ = finish_i_;
114     generation_++;
115     last_sent_i_ = std::numeric_limits<size_t>::max();
116   }
117   check_timeout(data);
118 }
119 
do_finish(Data & data)120 void SequenceDispatcher::do_finish(Data &data) {
121   CHECK(data.state_ == State::Dummy);
122   data.state_ = State::Finish;
123   if (!parent_.empty()) {
124     send_closure(parent_, &Parent::on_result);
125   }
126 }
127 
on_result(NetQueryPtr query)128 void SequenceDispatcher::on_result(NetQueryPtr query) {
129   auto &data = data_from_token();
130   size_t pos = &data - &data_[0];
131   CHECK(pos < data_.size());
132 
133   if (query->last_timeout_ != 0) {
134     for (auto i = pos + 1; i < data_.size(); i++) {
135       data_[i].total_timeout_ += query->last_timeout_;
136       data_[i].last_timeout_ = query->last_timeout_;
137       check_timeout(data_[i]);
138     }
139   }
140 
141   if (query->is_error() && (query->error().code() == NetQuery::ResendInvokeAfter ||
142                             (query->error().code() == 400 && (query->error().message() == "MSG_WAIT_FAILED" ||
143                                                               query->error().message() == "MSG_WAIT_TIMEOUT")))) {
144     VLOG(net_query) << "Resend " << query;
145     query->resend();
146     query->debug("Waiting at SequenceDispatcher");
147     data.query_ = std::move(query);
148     do_resend(data);
149   } else {
150     try_resend_query(data, std::move(query));
151   }
152   loop();
153 }
154 
loop()155 void SequenceDispatcher::loop() {
156   for (; finish_i_ < data_.size() && data_[finish_i_].state_ == State::Finish; finish_i_++) {
157   }
158   if (next_i_ < finish_i_) {
159     next_i_ = finish_i_;
160   }
161   for (; next_i_ < data_.size() && data_[next_i_].state_ != State::Wait && wait_cnt_ < MAX_SIMULTANEOUS_WAIT;
162        next_i_++) {
163     if (data_[next_i_].state_ == State::Finish) {
164       continue;
165     }
166     NetQueryRef invoke_after;
167     if (last_sent_i_ != std::numeric_limits<size_t>::max() && data_[last_sent_i_].state_ == State::Wait) {
168       invoke_after = data_[last_sent_i_].net_query_ref_;
169     }
170     data_[next_i_].query_->set_invoke_after(invoke_after);
171     data_[next_i_].query_->last_timeout_ = 0;
172 
173     VLOG(net_query) << "Send " << data_[next_i_].query_;
174 
175     data_[next_i_].query_->debug("send to Td::send_with_callback");
176     data_[next_i_].query_->set_session_rand(session_rand_);
177     G()->net_query_dispatcher().dispatch_with_callback(std::move(data_[next_i_].query_),
178                                                        actor_shared(this, next_i_ + id_offset_));
179     data_[next_i_].state_ = State::Wait;
180     wait_cnt_++;
181     data_[next_i_].generation_ = generation_;
182     last_sent_i_ = next_i_;
183   }
184 
185   try_shrink();
186 
187   if (finish_i_ == data_.size() && !parent_.empty()) {
188     set_timeout_in(5);
189   }
190 }
191 
try_shrink()192 void SequenceDispatcher::try_shrink() {
193   if (finish_i_ * 2 > data_.size() && data_.size() > 5) {
194     CHECK(finish_i_ <= next_i_);
195     data_.erase(data_.begin(), data_.begin() + finish_i_);
196     next_i_ -= finish_i_;
197     if (last_sent_i_ != std::numeric_limits<size_t>::max()) {
198       if (last_sent_i_ >= finish_i_) {
199         last_sent_i_ -= finish_i_;
200       } else {
201         last_sent_i_ = std::numeric_limits<size_t>::max();
202       }
203     }
204     id_offset_ += finish_i_;
205     finish_i_ = 0;
206   }
207 }
208 
timeout_expired()209 void SequenceDispatcher::timeout_expired() {
210   if (finish_i_ != data_.size()) {
211     return;
212   }
213   CHECK(!parent_.empty());
214   set_timeout_in(1);
215   LOG(DEBUG) << "SequenceDispatcher ready to close";
216   send_closure(parent_, &Parent::ready_to_close);
217 }
218 
hangup()219 void SequenceDispatcher::hangup() {
220   stop();
221 }
222 
tear_down()223 void SequenceDispatcher::tear_down() {
224   for (auto &data : data_) {
225     if (data.query_.empty()) {
226       continue;
227     }
228     data.state_ = State::Dummy;
229     data.query_->set_error(Global::request_aborted_error());
230     do_finish(data);
231   }
232 }
233 
close_silent()234 void SequenceDispatcher::close_silent() {
235   for (auto &data : data_) {
236     if (!data.query_.empty()) {
237       data.query_->clear();
238     }
239   }
240   stop();
241 }
242 
243 /*** MultiSequenceDispatcher ***/
send_with_callback(NetQueryPtr query,ActorShared<NetQueryCallback> callback,uint64 sequence_id)244 void MultiSequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
245                                                  uint64 sequence_id) {
246   CHECK(sequence_id != 0);
247   auto it_ok = dispatchers_.emplace(sequence_id, Data{0, ActorOwn<SequenceDispatcher>()});
248   auto &data = it_ok.first->second;
249   if (it_ok.second) {
250     LOG(DEBUG) << "Create SequenceDispatcher" << sequence_id;
251     data.dispatcher_ = create_actor<SequenceDispatcher>("sequence dispatcher", actor_shared(this, sequence_id));
252   }
253   data.cnt_++;
254   query->debug(PSTRING() << "send to SequenceDispatcher " << tag("sequence_id", sequence_id));
255   send_closure(data.dispatcher_, &SequenceDispatcher::send_with_callback, std::move(query), std::move(callback));
256 }
257 
on_result()258 void MultiSequenceDispatcher::on_result() {
259   auto it = dispatchers_.find(get_link_token());
260   CHECK(it != dispatchers_.end());
261   it->second.cnt_--;
262 }
263 
ready_to_close()264 void MultiSequenceDispatcher::ready_to_close() {
265   auto it = dispatchers_.find(get_link_token());
266   CHECK(it != dispatchers_.end());
267   if (it->second.cnt_ == 0) {
268     LOG(DEBUG) << "Close SequenceDispatcher " << get_link_token();
269     dispatchers_.erase(it);
270   }
271 }
272 
273 }  // namespace td
274