1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/SequenceDispatcher.h"
8
9 #include "td/telegram/Global.h"
10 #include "td/telegram/net/NetQueryDispatcher.h"
11
12 #include "td/actor/PromiseFuture.h"
13
14 #include "td/utils/format.h"
15 #include "td/utils/logging.h"
16 #include "td/utils/misc.h"
17 #include "td/utils/SliceBuilder.h"
18 #include "td/utils/Status.h"
19
20 #include <limits>
21
22 namespace td {
23
24 /*** Sequence Dispatcher ***/
25 // Sends queries with invokeAfter.
26 //
27 // Each query has three states Start/Wait/Finish
28 //
29 // finish_i points to the first not Finish query.
30 // next_i points to the next query to be sent.
31 //
32 // Each query has generation of InvokeAfter chain.
33 //
34 // When query is send, its generation is set to current chain generation.
35 //
36 // When query is failed and its generation is equals to current generation we must start new chain:
37 // increment the generation and set next_i to finish_i.
38 //
39 // last_sent_i points to the last sent query in current chain.
40 //
send_with_callback(NetQueryPtr query,ActorShared<NetQueryCallback> callback)41 void SequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
42 cancel_timeout();
43 query->debug("Waiting at SequenceDispatcher");
44 auto query_weak_ref = query.get_weak();
45 data_.push_back(Data{State::Start, std::move(query_weak_ref), std::move(query), std::move(callback), 0, 0.0, 0.0});
46 loop();
47 }
48
check_timeout(Data & data)49 void SequenceDispatcher::check_timeout(Data &data) {
50 if (data.state_ != State::Start) {
51 return;
52 }
53 data.query_->total_timeout_ += data.total_timeout_;
54 data.total_timeout_ = 0;
55 if (data.query_->total_timeout_ > data.query_->total_timeout_limit_) {
56 LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
57 << data.query_->total_timeout_ << " is greater than total_timeout_limit "
58 << data.query_->total_timeout_limit_;
59 data.query_->set_error(Status::Error(
60 429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
61 data.state_ = State::Dummy;
62 try_resend_query(data, std::move(data.query_));
63 }
64 }
65
try_resend_query(Data & data,NetQueryPtr query)66 void SequenceDispatcher::try_resend_query(Data &data, NetQueryPtr query) {
67 size_t pos = &data - &data_[0];
68 CHECK(pos < data_.size());
69 CHECK(data.state_ == State::Dummy);
70 data.state_ = State::Wait;
71 wait_cnt_++;
72 auto token = pos + id_offset_;
73 // TODO: if query is ok, use NetQueryCallback::on_result
74 auto promise = PromiseCreator::lambda([&, self = actor_shared(this, token)](NetQueryPtr query) mutable {
75 if (!query.empty()) {
76 send_closure(std::move(self), &SequenceDispatcher::on_resend_ok, std::move(query));
77 } else {
78 send_closure(std::move(self), &SequenceDispatcher::on_resend_error);
79 }
80 });
81 send_closure(data.callback_, &NetQueryCallback::on_result_resendable, std::move(query), std::move(promise));
82 }
83
data_from_token()84 SequenceDispatcher::Data &SequenceDispatcher::data_from_token() {
85 auto token = narrow_cast<size_t>(get_link_token());
86 auto pos = token - id_offset_;
87 CHECK(pos < data_.size());
88 auto &data = data_[pos];
89 CHECK(data.state_ == State::Wait);
90 CHECK(wait_cnt_ > 0);
91 wait_cnt_--;
92 data.state_ = State::Dummy;
93 return data;
94 }
95
on_resend_ok(NetQueryPtr query)96 void SequenceDispatcher::on_resend_ok(NetQueryPtr query) {
97 auto &data = data_from_token();
98 data.query_ = std::move(query);
99 do_resend(data);
100 loop();
101 }
102
on_resend_error()103 void SequenceDispatcher::on_resend_error() {
104 auto &data = data_from_token();
105 do_finish(data);
106 loop();
107 }
108
do_resend(Data & data)109 void SequenceDispatcher::do_resend(Data &data) {
110 CHECK(data.state_ == State::Dummy);
111 data.state_ = State::Start;
112 if (data.generation_ == generation_) {
113 next_i_ = finish_i_;
114 generation_++;
115 last_sent_i_ = std::numeric_limits<size_t>::max();
116 }
117 check_timeout(data);
118 }
119
do_finish(Data & data)120 void SequenceDispatcher::do_finish(Data &data) {
121 CHECK(data.state_ == State::Dummy);
122 data.state_ = State::Finish;
123 if (!parent_.empty()) {
124 send_closure(parent_, &Parent::on_result);
125 }
126 }
127
on_result(NetQueryPtr query)128 void SequenceDispatcher::on_result(NetQueryPtr query) {
129 auto &data = data_from_token();
130 size_t pos = &data - &data_[0];
131 CHECK(pos < data_.size());
132
133 if (query->last_timeout_ != 0) {
134 for (auto i = pos + 1; i < data_.size(); i++) {
135 data_[i].total_timeout_ += query->last_timeout_;
136 data_[i].last_timeout_ = query->last_timeout_;
137 check_timeout(data_[i]);
138 }
139 }
140
141 if (query->is_error() && (query->error().code() == NetQuery::ResendInvokeAfter ||
142 (query->error().code() == 400 && (query->error().message() == "MSG_WAIT_FAILED" ||
143 query->error().message() == "MSG_WAIT_TIMEOUT")))) {
144 VLOG(net_query) << "Resend " << query;
145 query->resend();
146 query->debug("Waiting at SequenceDispatcher");
147 data.query_ = std::move(query);
148 do_resend(data);
149 } else {
150 try_resend_query(data, std::move(query));
151 }
152 loop();
153 }
154
loop()155 void SequenceDispatcher::loop() {
156 for (; finish_i_ < data_.size() && data_[finish_i_].state_ == State::Finish; finish_i_++) {
157 }
158 if (next_i_ < finish_i_) {
159 next_i_ = finish_i_;
160 }
161 for (; next_i_ < data_.size() && data_[next_i_].state_ != State::Wait && wait_cnt_ < MAX_SIMULTANEOUS_WAIT;
162 next_i_++) {
163 if (data_[next_i_].state_ == State::Finish) {
164 continue;
165 }
166 NetQueryRef invoke_after;
167 if (last_sent_i_ != std::numeric_limits<size_t>::max() && data_[last_sent_i_].state_ == State::Wait) {
168 invoke_after = data_[last_sent_i_].net_query_ref_;
169 }
170 data_[next_i_].query_->set_invoke_after(invoke_after);
171 data_[next_i_].query_->last_timeout_ = 0;
172
173 VLOG(net_query) << "Send " << data_[next_i_].query_;
174
175 data_[next_i_].query_->debug("send to Td::send_with_callback");
176 data_[next_i_].query_->set_session_rand(session_rand_);
177 G()->net_query_dispatcher().dispatch_with_callback(std::move(data_[next_i_].query_),
178 actor_shared(this, next_i_ + id_offset_));
179 data_[next_i_].state_ = State::Wait;
180 wait_cnt_++;
181 data_[next_i_].generation_ = generation_;
182 last_sent_i_ = next_i_;
183 }
184
185 try_shrink();
186
187 if (finish_i_ == data_.size() && !parent_.empty()) {
188 set_timeout_in(5);
189 }
190 }
191
try_shrink()192 void SequenceDispatcher::try_shrink() {
193 if (finish_i_ * 2 > data_.size() && data_.size() > 5) {
194 CHECK(finish_i_ <= next_i_);
195 data_.erase(data_.begin(), data_.begin() + finish_i_);
196 next_i_ -= finish_i_;
197 if (last_sent_i_ != std::numeric_limits<size_t>::max()) {
198 if (last_sent_i_ >= finish_i_) {
199 last_sent_i_ -= finish_i_;
200 } else {
201 last_sent_i_ = std::numeric_limits<size_t>::max();
202 }
203 }
204 id_offset_ += finish_i_;
205 finish_i_ = 0;
206 }
207 }
208
timeout_expired()209 void SequenceDispatcher::timeout_expired() {
210 if (finish_i_ != data_.size()) {
211 return;
212 }
213 CHECK(!parent_.empty());
214 set_timeout_in(1);
215 LOG(DEBUG) << "SequenceDispatcher ready to close";
216 send_closure(parent_, &Parent::ready_to_close);
217 }
218
hangup()219 void SequenceDispatcher::hangup() {
220 stop();
221 }
222
tear_down()223 void SequenceDispatcher::tear_down() {
224 for (auto &data : data_) {
225 if (data.query_.empty()) {
226 continue;
227 }
228 data.state_ = State::Dummy;
229 data.query_->set_error(Global::request_aborted_error());
230 do_finish(data);
231 }
232 }
233
close_silent()234 void SequenceDispatcher::close_silent() {
235 for (auto &data : data_) {
236 if (!data.query_.empty()) {
237 data.query_->clear();
238 }
239 }
240 stop();
241 }
242
243 /*** MultiSequenceDispatcher ***/
send_with_callback(NetQueryPtr query,ActorShared<NetQueryCallback> callback,uint64 sequence_id)244 void MultiSequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
245 uint64 sequence_id) {
246 CHECK(sequence_id != 0);
247 auto it_ok = dispatchers_.emplace(sequence_id, Data{0, ActorOwn<SequenceDispatcher>()});
248 auto &data = it_ok.first->second;
249 if (it_ok.second) {
250 LOG(DEBUG) << "Create SequenceDispatcher" << sequence_id;
251 data.dispatcher_ = create_actor<SequenceDispatcher>("sequence dispatcher", actor_shared(this, sequence_id));
252 }
253 data.cnt_++;
254 query->debug(PSTRING() << "send to SequenceDispatcher " << tag("sequence_id", sequence_id));
255 send_closure(data.dispatcher_, &SequenceDispatcher::send_with_callback, std::move(query), std::move(callback));
256 }
257
on_result()258 void MultiSequenceDispatcher::on_result() {
259 auto it = dispatchers_.find(get_link_token());
260 CHECK(it != dispatchers_.end());
261 it->second.cnt_--;
262 }
263
ready_to_close()264 void MultiSequenceDispatcher::ready_to_close() {
265 auto it = dispatchers_.find(get_link_token());
266 CHECK(it != dispatchers_.end());
267 if (it->second.cnt_ == 0) {
268 LOG(DEBUG) << "Close SequenceDispatcher " << get_link_token();
269 dispatchers_.erase(it);
270 }
271 }
272
273 } // namespace td
274