1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/net/HttpConnectionBase.h"
8
9 #include "td/net/HttpHeaderCreator.h"
10
11 #include "td/utils/common.h"
12 #include "td/utils/logging.h"
13 #include "td/utils/misc.h"
14 #include "td/utils/port/detail/PollableFd.h"
15
16 namespace td {
17 namespace detail {
18
HttpConnectionBase(State state,BufferedFd<SocketFd> fd,SslStream ssl_stream,size_t max_post_size,size_t max_files,int32 idle_timeout,int32 slow_scheduler_id)19 HttpConnectionBase::HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size,
20 size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
21 : state_(state)
22 , fd_(std::move(fd))
23 , ssl_stream_(std::move(ssl_stream))
24 , max_post_size_(max_post_size)
25 , max_files_(max_files)
26 , idle_timeout_(idle_timeout)
27 , slow_scheduler_id_(slow_scheduler_id) {
28 CHECK(state_ != State::Close);
29
30 if (ssl_stream_) {
31 read_source_ >> ssl_stream_.read_byte_flow() >> read_sink_;
32 write_source_ >> ssl_stream_.write_byte_flow() >> write_sink_;
33 } else {
34 read_source_ >> read_sink_;
35 write_source_ >> write_sink_;
36 }
37 peer_address_.init_peer_address(fd_).ignore();
38 }
39
live_event()40 void HttpConnectionBase::live_event() {
41 if (idle_timeout_ != 0) {
42 set_timeout_in(idle_timeout_);
43 }
44 }
45
start_up()46 void HttpConnectionBase::start_up() {
47 Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
48 reader_.init(read_sink_.get_output(), max_post_size_, max_files_);
49 if (state_ == State::Read) {
50 current_query_ = make_unique<HttpQuery>();
51 }
52 live_event();
53 yield();
54 }
tear_down()55 void HttpConnectionBase::tear_down() {
56 Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
57 fd_.close();
58 }
59
write_next_noflush(BufferSlice buffer)60 void HttpConnectionBase::write_next_noflush(BufferSlice buffer) {
61 CHECK(state_ == State::Write);
62 write_buffer_.append(std::move(buffer));
63 }
write_next(BufferSlice buffer)64 void HttpConnectionBase::write_next(BufferSlice buffer) {
65 write_next_noflush(std::move(buffer));
66 loop();
67 }
68
write_ok()69 void HttpConnectionBase::write_ok() {
70 CHECK(state_ == State::Write);
71 current_query_ = make_unique<HttpQuery>();
72 state_ = State::Read;
73 live_event();
74 loop();
75 }
76
write_error(Status error)77 void HttpConnectionBase::write_error(Status error) {
78 CHECK(state_ == State::Write);
79 LOG(WARNING) << "Close HTTP connection: " << error;
80 state_ = State::Close;
81 loop();
82 }
83
timeout_expired()84 void HttpConnectionBase::timeout_expired() {
85 LOG(INFO) << "Idle timeout expired";
86
87 if (fd_.need_flush_write()) {
88 on_error(Status::Error("Write timeout expired"));
89 } else if (state_ == State::Read) {
90 on_error(Status::Error("Read timeout expired"));
91 }
92
93 stop();
94 }
loop()95 void HttpConnectionBase::loop() {
96 if (ssl_stream_) {
97 //ssl_stream_.read_byte_flow().set_need_size(0);
98 ssl_stream_.write_byte_flow().reset_need_size();
99 }
100 sync_with_poll(fd_);
101 if (can_read_local(fd_)) {
102 LOG(DEBUG) << "Can read from the connection";
103 auto r = fd_.flush_read();
104 if (r.is_error()) {
105 if (!begins_with(r.error().message(), "SSL error {336134278")) { // if error is not yet outputed
106 LOG(INFO) << "Receive flush_read error: " << r.error();
107 }
108 on_error(Status::Error(r.error().public_message()));
109 return stop();
110 }
111 }
112 read_source_.wakeup();
113
114 // TODO: read_next even when state_ == State::Write
115
116 bool want_read = false;
117 bool can_be_slow = slow_scheduler_id_ == -1;
118 if (state_ == State::Read) {
119 auto res = reader_.read_next(current_query_.get(), can_be_slow);
120 if (res.is_error()) {
121 if (res.error().message() == "SLOW") {
122 LOG(INFO) << "Slow HTTP connection: migrate to " << slow_scheduler_id_;
123 CHECK(!can_be_slow);
124 yield();
125 migrate(slow_scheduler_id_);
126 slow_scheduler_id_ = -1;
127 return;
128 }
129 live_event();
130 state_ = State::Write;
131 LOG(INFO) << res.error();
132 HttpHeaderCreator hc;
133 hc.init_status_line(res.error().code());
134 hc.set_content_size(0);
135 write_buffer_.append(hc.finish().ok());
136 close_after_write_ = true;
137 on_error(Status::Error(res.error().public_message()));
138 } else if (res.ok() == 0) {
139 state_ = State::Write;
140 LOG(DEBUG) << "Send query to handler";
141 live_event();
142 current_query_->peer_address_ = peer_address_;
143 on_query(std::move(current_query_));
144 } else {
145 want_read = true;
146 }
147 }
148
149 write_source_.wakeup();
150
151 if (can_write_local(fd_)) {
152 LOG(DEBUG) << "Can write to the connection";
153 auto r = fd_.flush_write();
154 if (r.is_error()) {
155 LOG(INFO) << "Receive flush_write error: " << r.error();
156 on_error(Status::Error(r.error().public_message()));
157 }
158 if (close_after_write_ && !fd_.need_flush_write()) {
159 return stop();
160 }
161 }
162
163 Status pending_error;
164 if (fd_.get_poll_info().get_flags_local().has_pending_error()) {
165 pending_error = fd_.get_pending_error();
166 }
167 if (pending_error.is_ok() && write_sink_.status().is_error()) {
168 pending_error = std::move(write_sink_.status());
169 }
170 if (pending_error.is_ok() && read_sink_.status().is_error()) {
171 pending_error = std::move(read_sink_.status());
172 }
173 if (pending_error.is_error()) {
174 LOG(INFO) << pending_error;
175 if (!close_after_write_) {
176 on_error(Status::Error(pending_error.public_message()));
177 }
178 state_ = State::Close;
179 }
180
181 if (can_close_local(fd_)) {
182 LOG(DEBUG) << "Can close the connection";
183 state_ = State::Close;
184 }
185 if (state_ == State::Close) {
186 if (fd_.need_flush_write()) {
187 LOG(INFO) << "Close nonempty connection";
188 }
189 if (want_read && (!fd_.input_buffer().empty() || current_query_->type_ != HttpQuery::Type::Empty)) {
190 LOG(INFO) << "Close connection while reading request/response";
191 }
192 return stop();
193 }
194 }
195
on_start_migrate(int32 sched_id)196 void HttpConnectionBase::on_start_migrate(int32 sched_id) {
197 Scheduler::unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
198 }
199
on_finish_migrate()200 void HttpConnectionBase::on_finish_migrate() {
201 Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
202 live_event();
203 }
204
205 } // namespace detail
206 } // namespace td
207