1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/net/HttpConnectionBase.h"
8 
9 #include "td/net/HttpHeaderCreator.h"
10 
11 #include "td/utils/common.h"
12 #include "td/utils/logging.h"
13 #include "td/utils/misc.h"
14 #include "td/utils/port/detail/PollableFd.h"
15 
16 namespace td {
17 namespace detail {
18 
HttpConnectionBase(State state,BufferedFd<SocketFd> fd,SslStream ssl_stream,size_t max_post_size,size_t max_files,int32 idle_timeout,int32 slow_scheduler_id)19 HttpConnectionBase::HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size,
20                                        size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
21     : state_(state)
22     , fd_(std::move(fd))
23     , ssl_stream_(std::move(ssl_stream))
24     , max_post_size_(max_post_size)
25     , max_files_(max_files)
26     , idle_timeout_(idle_timeout)
27     , slow_scheduler_id_(slow_scheduler_id) {
28   CHECK(state_ != State::Close);
29 
30   if (ssl_stream_) {
31     read_source_ >> ssl_stream_.read_byte_flow() >> read_sink_;
32     write_source_ >> ssl_stream_.write_byte_flow() >> write_sink_;
33   } else {
34     read_source_ >> read_sink_;
35     write_source_ >> write_sink_;
36   }
37   peer_address_.init_peer_address(fd_).ignore();
38 }
39 
live_event()40 void HttpConnectionBase::live_event() {
41   if (idle_timeout_ != 0) {
42     set_timeout_in(idle_timeout_);
43   }
44 }
45 
start_up()46 void HttpConnectionBase::start_up() {
47   Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
48   reader_.init(read_sink_.get_output(), max_post_size_, max_files_);
49   if (state_ == State::Read) {
50     current_query_ = make_unique<HttpQuery>();
51   }
52   live_event();
53   yield();
54 }
tear_down()55 void HttpConnectionBase::tear_down() {
56   Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
57   fd_.close();
58 }
59 
write_next_noflush(BufferSlice buffer)60 void HttpConnectionBase::write_next_noflush(BufferSlice buffer) {
61   CHECK(state_ == State::Write);
62   write_buffer_.append(std::move(buffer));
63 }
write_next(BufferSlice buffer)64 void HttpConnectionBase::write_next(BufferSlice buffer) {
65   write_next_noflush(std::move(buffer));
66   loop();
67 }
68 
write_ok()69 void HttpConnectionBase::write_ok() {
70   CHECK(state_ == State::Write);
71   current_query_ = make_unique<HttpQuery>();
72   state_ = State::Read;
73   live_event();
74   loop();
75 }
76 
write_error(Status error)77 void HttpConnectionBase::write_error(Status error) {
78   CHECK(state_ == State::Write);
79   LOG(WARNING) << "Close HTTP connection: " << error;
80   state_ = State::Close;
81   loop();
82 }
83 
timeout_expired()84 void HttpConnectionBase::timeout_expired() {
85   LOG(INFO) << "Idle timeout expired";
86 
87   if (fd_.need_flush_write()) {
88     on_error(Status::Error("Write timeout expired"));
89   } else if (state_ == State::Read) {
90     on_error(Status::Error("Read timeout expired"));
91   }
92 
93   stop();
94 }
loop()95 void HttpConnectionBase::loop() {
96   if (ssl_stream_) {
97     //ssl_stream_.read_byte_flow().set_need_size(0);
98     ssl_stream_.write_byte_flow().reset_need_size();
99   }
100   sync_with_poll(fd_);
101   if (can_read_local(fd_)) {
102     LOG(DEBUG) << "Can read from the connection";
103     auto r = fd_.flush_read();
104     if (r.is_error()) {
105       if (!begins_with(r.error().message(), "SSL error {336134278")) {  // if error is not yet outputed
106         LOG(INFO) << "Receive flush_read error: " << r.error();
107       }
108       on_error(Status::Error(r.error().public_message()));
109       return stop();
110     }
111   }
112   read_source_.wakeup();
113 
114   // TODO: read_next even when state_ == State::Write
115 
116   bool want_read = false;
117   bool can_be_slow = slow_scheduler_id_ == -1;
118   if (state_ == State::Read) {
119     auto res = reader_.read_next(current_query_.get(), can_be_slow);
120     if (res.is_error()) {
121       if (res.error().message() == "SLOW") {
122         LOG(INFO) << "Slow HTTP connection: migrate to " << slow_scheduler_id_;
123         CHECK(!can_be_slow);
124         yield();
125         migrate(slow_scheduler_id_);
126         slow_scheduler_id_ = -1;
127         return;
128       }
129       live_event();
130       state_ = State::Write;
131       LOG(INFO) << res.error();
132       HttpHeaderCreator hc;
133       hc.init_status_line(res.error().code());
134       hc.set_content_size(0);
135       write_buffer_.append(hc.finish().ok());
136       close_after_write_ = true;
137       on_error(Status::Error(res.error().public_message()));
138     } else if (res.ok() == 0) {
139       state_ = State::Write;
140       LOG(DEBUG) << "Send query to handler";
141       live_event();
142       current_query_->peer_address_ = peer_address_;
143       on_query(std::move(current_query_));
144     } else {
145       want_read = true;
146     }
147   }
148 
149   write_source_.wakeup();
150 
151   if (can_write_local(fd_)) {
152     LOG(DEBUG) << "Can write to the connection";
153     auto r = fd_.flush_write();
154     if (r.is_error()) {
155       LOG(INFO) << "Receive flush_write error: " << r.error();
156       on_error(Status::Error(r.error().public_message()));
157     }
158     if (close_after_write_ && !fd_.need_flush_write()) {
159       return stop();
160     }
161   }
162 
163   Status pending_error;
164   if (fd_.get_poll_info().get_flags_local().has_pending_error()) {
165     pending_error = fd_.get_pending_error();
166   }
167   if (pending_error.is_ok() && write_sink_.status().is_error()) {
168     pending_error = std::move(write_sink_.status());
169   }
170   if (pending_error.is_ok() && read_sink_.status().is_error()) {
171     pending_error = std::move(read_sink_.status());
172   }
173   if (pending_error.is_error()) {
174     LOG(INFO) << pending_error;
175     if (!close_after_write_) {
176       on_error(Status::Error(pending_error.public_message()));
177     }
178     state_ = State::Close;
179   }
180 
181   if (can_close_local(fd_)) {
182     LOG(DEBUG) << "Can close the connection";
183     state_ = State::Close;
184   }
185   if (state_ == State::Close) {
186     if (fd_.need_flush_write()) {
187       LOG(INFO) << "Close nonempty connection";
188     }
189     if (want_read && (!fd_.input_buffer().empty() || current_query_->type_ != HttpQuery::Type::Empty)) {
190       LOG(INFO) << "Close connection while reading request/response";
191     }
192     return stop();
193   }
194 }
195 
on_start_migrate(int32 sched_id)196 void HttpConnectionBase::on_start_migrate(int32 sched_id) {
197   Scheduler::unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
198 }
199 
on_finish_migrate()200 void HttpConnectionBase::on_finish_migrate() {
201   Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
202   live_event();
203 }
204 
205 }  // namespace detail
206 }  // namespace td
207