1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_downstream.h"
26 
27 #include <cassert>
28 
29 #include "url-parser/url_parser.h"
30 
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
40 #ifdef HAVE_MRUBY
41 #  include "shrpx_mruby.h"
42 #endif // HAVE_MRUBY
43 #include "util.h"
44 #include "http2.h"
45 
46 namespace shrpx {
47 
48 namespace {
upstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)49 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto downstream = static_cast<Downstream *>(w->data);
51   auto upstream = downstream->get_upstream();
52 
53   auto which = revents == EV_READ ? "read" : "write";
54 
55   if (LOG_ENABLED(INFO)) {
56     DLOG(INFO, downstream) << "upstream timeout stream_id="
57                            << downstream->get_stream_id() << " event=" << which;
58   }
59 
60   downstream->disable_upstream_rtimer();
61   downstream->disable_upstream_wtimer();
62 
63   upstream->on_timeout(downstream);
64 }
65 } // namespace
66 
67 namespace {
upstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)68 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
69   upstream_timeoutcb(loop, w, EV_READ);
70 }
71 } // namespace
72 
73 namespace {
upstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)74 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
75   upstream_timeoutcb(loop, w, EV_WRITE);
76 }
77 } // namespace
78 
79 namespace {
downstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)80 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
81   auto downstream = static_cast<Downstream *>(w->data);
82 
83   auto which = revents == EV_READ ? "read" : "write";
84 
85   if (LOG_ENABLED(INFO)) {
86     DLOG(INFO, downstream) << "downstream timeout stream_id="
87                            << downstream->get_downstream_stream_id()
88                            << " event=" << which;
89   }
90 
91   downstream->disable_downstream_rtimer();
92   downstream->disable_downstream_wtimer();
93 
94   auto dconn = downstream->get_downstream_connection();
95 
96   if (dconn) {
97     dconn->on_timeout();
98   }
99 }
100 } // namespace
101 
102 namespace {
downstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)103 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
104   downstream_timeoutcb(loop, w, EV_READ);
105 }
106 } // namespace
107 
108 namespace {
downstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)109 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
110   downstream_timeoutcb(loop, w, EV_WRITE);
111 }
112 } // namespace
113 
114 // upstream could be nullptr for unittests
Downstream(Upstream * upstream,MemchunkPool * mcpool,int32_t stream_id)115 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
116                        int32_t stream_id)
117     : dlnext(nullptr),
118       dlprev(nullptr),
119       response_sent_body_length(0),
120       balloc_(1024, 1024),
121       req_(balloc_),
122       resp_(balloc_),
123       request_start_time_(std::chrono::high_resolution_clock::now()),
124       blocked_request_buf_(mcpool),
125       request_buf_(mcpool),
126       response_buf_(mcpool),
127       upstream_(upstream),
128       blocked_link_(nullptr),
129       addr_(nullptr),
130       num_retry_(0),
131       stream_id_(stream_id),
132       assoc_stream_id_(-1),
133       downstream_stream_id_(-1),
134       response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
135       affinity_cookie_(0),
136       request_state_(DownstreamState::INITIAL),
137       response_state_(DownstreamState::INITIAL),
138       dispatch_state_(DispatchState::NONE),
139       upgraded_(false),
140       chunked_request_(false),
141       chunked_response_(false),
142       expect_final_response_(false),
143       request_pending_(false),
144       request_header_sent_(false),
145       accesslog_written_(false),
146       new_affinity_cookie_(false),
147       blocked_request_data_eof_(false),
148       expect_100_continue_(false) {
149 
150   auto &timeoutconf = get_config()->http2.timeout;
151 
152   ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
153                 timeoutconf.stream_read);
154   ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
155                 timeoutconf.stream_write);
156   ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
157                 timeoutconf.stream_read);
158   ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
159                 timeoutconf.stream_write);
160 
161   upstream_rtimer_.data = this;
162   upstream_wtimer_.data = this;
163   downstream_rtimer_.data = this;
164   downstream_wtimer_.data = this;
165 
166   rcbufs_.reserve(32);
167 }
168 
~Downstream()169 Downstream::~Downstream() {
170   if (LOG_ENABLED(INFO)) {
171     DLOG(INFO, this) << "Deleting";
172   }
173 
174   // check nullptr for unittest
175   if (upstream_) {
176     auto loop = upstream_->get_client_handler()->get_loop();
177 
178     ev_timer_stop(loop, &upstream_rtimer_);
179     ev_timer_stop(loop, &upstream_wtimer_);
180     ev_timer_stop(loop, &downstream_rtimer_);
181     ev_timer_stop(loop, &downstream_wtimer_);
182 
183 #ifdef HAVE_MRUBY
184     auto handler = upstream_->get_client_handler();
185     auto worker = handler->get_worker();
186     auto mruby_ctx = worker->get_mruby_context();
187 
188     mruby_ctx->delete_downstream(this);
189 #endif // HAVE_MRUBY
190   }
191 
192 #ifdef HAVE_MRUBY
193   if (dconn_) {
194     const auto &group = dconn_->get_downstream_addr_group();
195     if (group) {
196       const auto &mruby_ctx = group->shared_addr->mruby_ctx;
197       mruby_ctx->delete_downstream(this);
198     }
199   }
200 #endif // HAVE_MRUBY
201 
202   // DownstreamConnection may refer to this object.  Delete it now
203   // explicitly.
204   dconn_.reset();
205 
206   for (auto rcbuf : rcbufs_) {
207     nghttp2_rcbuf_decref(rcbuf);
208   }
209 
210   if (LOG_ENABLED(INFO)) {
211     DLOG(INFO, this) << "Deleted";
212   }
213 }
214 
attach_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)215 int Downstream::attach_downstream_connection(
216     std::unique_ptr<DownstreamConnection> dconn) {
217   if (dconn->attach_downstream(this) != 0) {
218     return -1;
219   }
220 
221   dconn_ = std::move(dconn);
222 
223   return 0;
224 }
225 
detach_downstream_connection()226 void Downstream::detach_downstream_connection() {
227   if (!dconn_) {
228     return;
229   }
230 
231 #ifdef HAVE_MRUBY
232   const auto &group = dconn_->get_downstream_addr_group();
233   if (group) {
234     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
235     mruby_ctx->delete_downstream(this);
236   }
237 #endif // HAVE_MRUBY
238 
239   dconn_->detach_downstream(this);
240 
241   auto handler = dconn_->get_client_handler();
242 
243   handler->pool_downstream_connection(
244       std::unique_ptr<DownstreamConnection>(dconn_.release()));
245 }
246 
get_downstream_connection()247 DownstreamConnection *Downstream::get_downstream_connection() {
248   return dconn_.get();
249 }
250 
pop_downstream_connection()251 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
252 #ifdef HAVE_MRUBY
253   if (!dconn_) {
254     return nullptr;
255   }
256 
257   const auto &group = dconn_->get_downstream_addr_group();
258   if (group) {
259     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
260     mruby_ctx->delete_downstream(this);
261   }
262 #endif // HAVE_MRUBY
263 
264   return std::unique_ptr<DownstreamConnection>(dconn_.release());
265 }
266 
pause_read(IOCtrlReason reason)267 void Downstream::pause_read(IOCtrlReason reason) {
268   if (dconn_) {
269     dconn_->pause_read(reason);
270   }
271 }
272 
resume_read(IOCtrlReason reason,size_t consumed)273 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
274   if (dconn_) {
275     return dconn_->resume_read(reason, consumed);
276   }
277 
278   return 0;
279 }
280 
force_resume_read()281 void Downstream::force_resume_read() {
282   if (dconn_) {
283     dconn_->force_resume_read();
284   }
285 }
286 
287 namespace {
288 const HeaderRefs::value_type *
search_header_linear_backwards(const HeaderRefs & headers,const StringRef & name)289 search_header_linear_backwards(const HeaderRefs &headers,
290                                const StringRef &name) {
291   for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
292     auto &kv = *it;
293     if (kv.name == name) {
294       return &kv;
295     }
296   }
297   return nullptr;
298 }
299 } // namespace
300 
assemble_request_cookie()301 StringRef Downstream::assemble_request_cookie() {
302   size_t len = 0;
303 
304   for (auto &kv : req_.fs.headers()) {
305     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
306       continue;
307     }
308 
309     len += kv.value.size() + str_size("; ");
310   }
311 
312   auto iov = make_byte_ref(balloc_, len + 1);
313   auto p = iov.base;
314 
315   for (auto &kv : req_.fs.headers()) {
316     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
317       continue;
318     }
319 
320     auto end = std::end(kv.value);
321     for (auto it = std::begin(kv.value) + kv.value.size();
322          it != std::begin(kv.value); --it) {
323       auto c = *(it - 1);
324       if (c == ' ' || c == ';') {
325         continue;
326       }
327       end = it;
328       break;
329     }
330 
331     p = std::copy(std::begin(kv.value), end, p);
332     p = util::copy_lit(p, "; ");
333   }
334 
335   // cut trailing "; "
336   if (p - iov.base >= 2) {
337     p -= 2;
338   }
339 
340   return StringRef{iov.base, p};
341 }
342 
find_affinity_cookie(const StringRef & name)343 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
344   for (auto &kv : req_.fs.headers()) {
345     if (kv.token != http2::HD_COOKIE) {
346       continue;
347     }
348 
349     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
350       if (*it == '\t' || *it == ' ' || *it == ';') {
351         ++it;
352         continue;
353       }
354 
355       auto end = std::find(it, std::end(kv.value), '=');
356       if (end == std::end(kv.value)) {
357         return 0;
358       }
359 
360       if (!util::streq(name, StringRef{it, end})) {
361         it = std::find(it, std::end(kv.value), ';');
362         continue;
363       }
364 
365       it = std::find(end + 1, std::end(kv.value), ';');
366       auto val = StringRef{end + 1, it};
367       if (val.size() != 8) {
368         return 0;
369       }
370       uint32_t h = 0;
371       for (auto c : val) {
372         auto n = util::hex_to_uint(c);
373         if (n == 256) {
374           return 0;
375         }
376         h <<= 4;
377         h += n;
378       }
379       affinity_cookie_ = h;
380       return h;
381     }
382   }
383   return 0;
384 }
385 
count_crumble_request_cookie()386 size_t Downstream::count_crumble_request_cookie() {
387   size_t n = 0;
388   for (auto &kv : req_.fs.headers()) {
389     if (kv.token != http2::HD_COOKIE) {
390       continue;
391     }
392 
393     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
394       if (*it == '\t' || *it == ' ' || *it == ';') {
395         ++it;
396         continue;
397       }
398 
399       it = std::find(it, std::end(kv.value), ';');
400 
401       ++n;
402     }
403   }
404   return n;
405 }
406 
crumble_request_cookie(std::vector<nghttp2_nv> & nva)407 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
408   for (auto &kv : req_.fs.headers()) {
409     if (kv.token != http2::HD_COOKIE) {
410       continue;
411     }
412 
413     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
414       if (*it == '\t' || *it == ' ' || *it == ';') {
415         ++it;
416         continue;
417       }
418 
419       auto first = it;
420 
421       it = std::find(it, std::end(kv.value), ';');
422 
423       nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
424                      (size_t)(it - first),
425                      (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
426                                NGHTTP2_NV_FLAG_NO_COPY_VALUE |
427                                (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
428     }
429   }
430 }
431 
432 namespace {
add_header(size_t & sum,HeaderRefs & headers,const StringRef & name,const StringRef & value,bool no_index,int32_t token)433 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
434                 const StringRef &value, bool no_index, int32_t token) {
435   sum += name.size() + value.size();
436   headers.emplace_back(name, value, no_index, token);
437 }
438 } // namespace
439 
440 namespace {
alloc_header_name(BlockAllocator & balloc,const StringRef & name)441 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
442   auto iov = make_byte_ref(balloc, name.size() + 1);
443   auto p = iov.base;
444   p = std::copy(std::begin(name), std::end(name), p);
445   util::inp_strlower(iov.base, p);
446   *p = '\0';
447 
448   return StringRef{iov.base, p};
449 }
450 } // namespace
451 
452 namespace {
append_last_header_key(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)453 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
454                             HeaderRefs &headers, const char *data, size_t len) {
455   assert(key_prev);
456   sum += len;
457   auto &item = headers.back();
458   auto name =
459       realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
460 
461   auto p = const_cast<uint8_t *>(name.byte());
462   util::inp_strlower(p + name.size() - len, p + name.size());
463 
464   item.name = name;
465   item.token = http2::lookup_token(item.name);
466 }
467 } // namespace
468 
469 namespace {
append_last_header_value(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)470 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
471                               size_t &sum, HeaderRefs &headers,
472                               const char *data, size_t len) {
473   key_prev = false;
474   sum += len;
475   auto &item = headers.back();
476   item.value =
477       realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
478 }
479 } // namespace
480 
parse_content_length()481 int FieldStore::parse_content_length() {
482   content_length = -1;
483 
484   for (auto &kv : headers_) {
485     if (kv.token != http2::HD_CONTENT_LENGTH) {
486       continue;
487     }
488 
489     auto len = util::parse_uint(kv.value);
490     if (len == -1) {
491       return -1;
492     }
493     if (content_length != -1) {
494       return -1;
495     }
496     content_length = len;
497   }
498   return 0;
499 }
500 
header(int32_t token) const501 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
502   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
503     auto &kv = *it;
504     if (kv.token == token) {
505       return &kv;
506     }
507   }
508   return nullptr;
509 }
510 
header(int32_t token)511 HeaderRefs::value_type *FieldStore::header(int32_t token) {
512   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
513     auto &kv = *it;
514     if (kv.token == token) {
515       return &kv;
516     }
517   }
518   return nullptr;
519 }
520 
header(const StringRef & name) const521 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
522   return search_header_linear_backwards(headers_, name);
523 }
524 
add_header_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)525 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
526                                   bool no_index, int32_t token) {
527   shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
528 }
529 
alloc_add_header_name(const StringRef & name)530 void FieldStore::alloc_add_header_name(const StringRef &name) {
531   auto name_ref = alloc_header_name(balloc_, name);
532   auto token = http2::lookup_token(name_ref);
533   add_header_token(name_ref, StringRef{}, false, token);
534   header_key_prev_ = true;
535 }
536 
append_last_header_key(const char * data,size_t len)537 void FieldStore::append_last_header_key(const char *data, size_t len) {
538   shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
539                                 headers_, data, len);
540 }
541 
append_last_header_value(const char * data,size_t len)542 void FieldStore::append_last_header_value(const char *data, size_t len) {
543   shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
544                                   headers_, data, len);
545 }
546 
clear_headers()547 void FieldStore::clear_headers() {
548   headers_.clear();
549   header_key_prev_ = false;
550 }
551 
add_trailer_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)552 void FieldStore::add_trailer_token(const StringRef &name,
553                                    const StringRef &value, bool no_index,
554                                    int32_t token) {
555   // Header size limit should be applied to all header and trailer
556   // fields combined.
557   shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
558 }
559 
alloc_add_trailer_name(const StringRef & name)560 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
561   auto name_ref = alloc_header_name(balloc_, name);
562   auto token = http2::lookup_token(name_ref);
563   add_trailer_token(name_ref, StringRef{}, false, token);
564   trailer_key_prev_ = true;
565 }
566 
append_last_trailer_key(const char * data,size_t len)567 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
568   shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
569                                 trailers_, data, len);
570 }
571 
append_last_trailer_value(const char * data,size_t len)572 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
573   shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
574                                   trailers_, data, len);
575 }
576 
erase_content_length_and_transfer_encoding()577 void FieldStore::erase_content_length_and_transfer_encoding() {
578   for (auto &kv : headers_) {
579     switch (kv.token) {
580     case http2::HD_CONTENT_LENGTH:
581     case http2::HD_TRANSFER_ENCODING:
582       kv.name = StringRef{};
583       kv.token = -1;
584       break;
585     }
586   }
587 }
588 
set_request_start_time(std::chrono::high_resolution_clock::time_point time)589 void Downstream::set_request_start_time(
590     std::chrono::high_resolution_clock::time_point time) {
591   request_start_time_ = std::move(time);
592 }
593 
594 const std::chrono::high_resolution_clock::time_point &
get_request_start_time() const595 Downstream::get_request_start_time() const {
596   return request_start_time_;
597 }
598 
reset_upstream(Upstream * upstream)599 void Downstream::reset_upstream(Upstream *upstream) {
600   upstream_ = upstream;
601   if (dconn_) {
602     dconn_->on_upstream_change(upstream);
603   }
604 }
605 
get_upstream() const606 Upstream *Downstream::get_upstream() const { return upstream_; }
607 
set_stream_id(int32_t stream_id)608 void Downstream::set_stream_id(int32_t stream_id) { stream_id_ = stream_id; }
609 
get_stream_id() const610 int32_t Downstream::get_stream_id() const { return stream_id_; }
611 
set_request_state(DownstreamState state)612 void Downstream::set_request_state(DownstreamState state) {
613   request_state_ = state;
614 }
615 
get_request_state() const616 DownstreamState Downstream::get_request_state() const { return request_state_; }
617 
get_chunked_request() const618 bool Downstream::get_chunked_request() const { return chunked_request_; }
619 
set_chunked_request(bool f)620 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
621 
request_buf_full()622 bool Downstream::request_buf_full() {
623   auto handler = upstream_->get_client_handler();
624   auto faddr = handler->get_upstream_addr();
625   auto worker = handler->get_worker();
626 
627   // We don't check buffer size here for API endpoint.
628   if (faddr->alt_mode == UpstreamAltMode::API) {
629     return false;
630   }
631 
632   if (dconn_) {
633     auto &downstreamconf = *worker->get_downstream_config();
634     return blocked_request_buf_.rleft() + request_buf_.rleft() >=
635            downstreamconf.request_buffer_size;
636   }
637 
638   return false;
639 }
640 
get_request_buf()641 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
642 
643 // Call this function after this object is attached to
644 // Downstream. Otherwise, the program will crash.
push_request_headers()645 int Downstream::push_request_headers() {
646   if (!dconn_) {
647     DLOG(INFO, this) << "dconn_ is NULL";
648     return -1;
649   }
650   return dconn_->push_request_headers();
651 }
652 
push_upload_data_chunk(const uint8_t * data,size_t datalen)653 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
654   req_.recv_body_length += datalen;
655 
656   if (!dconn_ && !request_header_sent_) {
657     blocked_request_buf_.append(data, datalen);
658     req_.unconsumed_body_length += datalen;
659     return 0;
660   }
661 
662   // Assumes that request headers have already been pushed to output
663   // buffer using push_request_headers().
664   if (!dconn_) {
665     DLOG(INFO, this) << "dconn_ is NULL";
666     return -1;
667   }
668   if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
669     return -1;
670   }
671 
672   req_.unconsumed_body_length += datalen;
673 
674   return 0;
675 }
676 
end_upload_data()677 int Downstream::end_upload_data() {
678   if (!dconn_ && !request_header_sent_) {
679     blocked_request_data_eof_ = true;
680     return 0;
681   }
682   if (!dconn_) {
683     DLOG(INFO, this) << "dconn_ is NULL";
684     return -1;
685   }
686   return dconn_->end_upload_data();
687 }
688 
rewrite_location_response_header(const StringRef & upstream_scheme)689 void Downstream::rewrite_location_response_header(
690     const StringRef &upstream_scheme) {
691   auto hd = resp_.fs.header(http2::HD_LOCATION);
692   if (!hd) {
693     return;
694   }
695 
696   if (request_downstream_host_.empty() || req_.authority.empty()) {
697     return;
698   }
699 
700   http_parser_url u{};
701   auto rv = http_parser_parse_url(hd->value.c_str(), hd->value.size(), 0, &u);
702   if (rv != 0) {
703     return;
704   }
705 
706   auto new_uri = http2::rewrite_location_uri(balloc_, hd->value, u,
707                                              request_downstream_host_,
708                                              req_.authority, upstream_scheme);
709 
710   if (new_uri.empty()) {
711     return;
712   }
713 
714   hd->value = new_uri;
715 }
716 
get_chunked_response() const717 bool Downstream::get_chunked_response() const { return chunked_response_; }
718 
set_chunked_response(bool f)719 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
720 
on_read()721 int Downstream::on_read() {
722   if (!dconn_) {
723     DLOG(INFO, this) << "dconn_ is NULL";
724     return -1;
725   }
726   return dconn_->on_read();
727 }
728 
set_response_state(DownstreamState state)729 void Downstream::set_response_state(DownstreamState state) {
730   response_state_ = state;
731 }
732 
get_response_state() const733 DownstreamState Downstream::get_response_state() const {
734   return response_state_;
735 }
736 
get_response_buf()737 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
738 
response_buf_full()739 bool Downstream::response_buf_full() {
740   if (dconn_) {
741     auto handler = upstream_->get_client_handler();
742     auto worker = handler->get_worker();
743     auto &downstreamconf = *worker->get_downstream_config();
744 
745     return response_buf_.rleft() >= downstreamconf.response_buffer_size;
746   }
747 
748   return false;
749 }
750 
validate_request_recv_body_length() const751 bool Downstream::validate_request_recv_body_length() const {
752   if (req_.fs.content_length == -1) {
753     return true;
754   }
755 
756   if (req_.fs.content_length != req_.recv_body_length) {
757     if (LOG_ENABLED(INFO)) {
758       DLOG(INFO, this) << "request invalid bodylen: content-length="
759                        << req_.fs.content_length
760                        << ", received=" << req_.recv_body_length;
761     }
762     return false;
763   }
764 
765   return true;
766 }
767 
validate_response_recv_body_length() const768 bool Downstream::validate_response_recv_body_length() const {
769   if (!expect_response_body() || resp_.fs.content_length == -1) {
770     return true;
771   }
772 
773   if (resp_.fs.content_length != resp_.recv_body_length) {
774     if (LOG_ENABLED(INFO)) {
775       DLOG(INFO, this) << "response invalid bodylen: content-length="
776                        << resp_.fs.content_length
777                        << ", received=" << resp_.recv_body_length;
778     }
779     return false;
780   }
781 
782   return true;
783 }
784 
check_upgrade_fulfilled_http2()785 void Downstream::check_upgrade_fulfilled_http2() {
786   // This handles nonzero req_.connect_proto and h1 frontend requests
787   // WebSocket upgrade.
788   upgraded_ = (req_.method == HTTP_CONNECT ||
789                req_.connect_proto == ConnectProto::WEBSOCKET) &&
790               resp_.http_status / 100 == 2;
791 }
792 
check_upgrade_fulfilled_http1()793 void Downstream::check_upgrade_fulfilled_http1() {
794   if (req_.method == HTTP_CONNECT) {
795     if (req_.connect_proto == ConnectProto::WEBSOCKET) {
796       if (resp_.http_status != 101) {
797         return;
798       }
799 
800       // This is done for HTTP/2 frontend only.
801       auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
802       if (!accept) {
803         return;
804       }
805 
806       std::array<uint8_t, base64::encode_length(20)> accept_buf;
807       auto expected =
808           http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
809 
810       upgraded_ = expected != "" && expected == accept->value;
811     } else {
812       upgraded_ = resp_.http_status / 100 == 2;
813     }
814 
815     return;
816   }
817 
818   if (resp_.http_status == 101) {
819     // TODO Do more strict checking for upgrade headers
820     upgraded_ = req_.upgrade_request;
821 
822     return;
823   }
824 }
825 
inspect_http2_request()826 void Downstream::inspect_http2_request() {
827   if (req_.method == HTTP_CONNECT) {
828     req_.upgrade_request = true;
829   }
830 }
831 
inspect_http1_request()832 void Downstream::inspect_http1_request() {
833   if (req_.method == HTTP_CONNECT) {
834     req_.upgrade_request = true;
835   } else if (req_.http_minor > 0) {
836     auto upgrade = req_.fs.header(http2::HD_UPGRADE);
837     if (upgrade) {
838       const auto &val = upgrade->value;
839       // TODO Perform more strict checking for upgrade headers
840       if (util::streq_l(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID, val.c_str(),
841                         val.size())) {
842         req_.http2_upgrade_seen = true;
843       } else {
844         req_.upgrade_request = true;
845 
846         // TODO Should we check Sec-WebSocket-Key, and
847         // Sec-WebSocket-Version as well?
848         if (util::strieq_l("websocket", val)) {
849           req_.connect_proto = ConnectProto::WEBSOCKET;
850         }
851       }
852     }
853   }
854   auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
855   if (transfer_encoding) {
856     req_.fs.content_length = -1;
857     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
858       chunked_request_ = true;
859     }
860   }
861 
862   auto expect = req_.fs.header(http2::HD_EXPECT);
863   expect_100_continue_ =
864       expect &&
865       util::strieq(expect->value, StringRef::from_lit("100-continue"));
866 }
867 
inspect_http1_response()868 void Downstream::inspect_http1_response() {
869   auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
870   if (transfer_encoding) {
871     resp_.fs.content_length = -1;
872     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
873       chunked_response_ = true;
874     }
875   }
876 }
877 
reset_response()878 void Downstream::reset_response() {
879   resp_.http_status = 0;
880   resp_.http_major = 1;
881   resp_.http_minor = 1;
882 }
883 
get_non_final_response() const884 bool Downstream::get_non_final_response() const {
885   return !upgraded_ && resp_.http_status / 100 == 1;
886 }
887 
supports_non_final_response() const888 bool Downstream::supports_non_final_response() const {
889   return req_.http_major == 2 || (req_.http_major == 1 && req_.http_minor == 1);
890 }
891 
get_upgraded() const892 bool Downstream::get_upgraded() const { return upgraded_; }
893 
get_http2_upgrade_request() const894 bool Downstream::get_http2_upgrade_request() const {
895   return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
896          response_state_ == DownstreamState::INITIAL;
897 }
898 
get_http2_settings() const899 StringRef Downstream::get_http2_settings() const {
900   auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
901   if (!http2_settings) {
902     return StringRef{};
903   }
904   return http2_settings->value;
905 }
906 
set_downstream_stream_id(int32_t stream_id)907 void Downstream::set_downstream_stream_id(int32_t stream_id) {
908   downstream_stream_id_ = stream_id;
909 }
910 
get_downstream_stream_id() const911 int32_t Downstream::get_downstream_stream_id() const {
912   return downstream_stream_id_;
913 }
914 
get_response_rst_stream_error_code() const915 uint32_t Downstream::get_response_rst_stream_error_code() const {
916   return response_rst_stream_error_code_;
917 }
918 
set_response_rst_stream_error_code(uint32_t error_code)919 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
920   response_rst_stream_error_code_ = error_code;
921 }
922 
set_expect_final_response(bool f)923 void Downstream::set_expect_final_response(bool f) {
924   expect_final_response_ = f;
925 }
926 
get_expect_final_response() const927 bool Downstream::get_expect_final_response() const {
928   return expect_final_response_;
929 }
930 
expect_response_body() const931 bool Downstream::expect_response_body() const {
932   return !resp_.headers_only &&
933          http2::expect_response_body(req_.method, resp_.http_status);
934 }
935 
expect_response_trailer() const936 bool Downstream::expect_response_trailer() const {
937   // In HTTP/2, if final response HEADERS does not bear END_STREAM it
938   // is possible trailer fields might come, regardless of request
939   // method or status code.
940   return !resp_.headers_only && resp_.http_major == 2;
941 }
942 
943 namespace {
reset_timer(struct ev_loop * loop,ev_timer * w)944 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
945 } // namespace
946 
947 namespace {
try_reset_timer(struct ev_loop * loop,ev_timer * w)948 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
949   if (!ev_is_active(w)) {
950     return;
951   }
952   ev_timer_again(loop, w);
953 }
954 } // namespace
955 
956 namespace {
ensure_timer(struct ev_loop * loop,ev_timer * w)957 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
958   if (ev_is_active(w)) {
959     return;
960   }
961   ev_timer_again(loop, w);
962 }
963 } // namespace
964 
965 namespace {
disable_timer(struct ev_loop * loop,ev_timer * w)966 void disable_timer(struct ev_loop *loop, ev_timer *w) {
967   ev_timer_stop(loop, w);
968 }
969 } // namespace
970 
reset_upstream_rtimer()971 void Downstream::reset_upstream_rtimer() {
972   if (get_config()->http2.timeout.stream_read == 0.) {
973     return;
974   }
975   auto loop = upstream_->get_client_handler()->get_loop();
976   reset_timer(loop, &upstream_rtimer_);
977 }
978 
reset_upstream_wtimer()979 void Downstream::reset_upstream_wtimer() {
980   auto loop = upstream_->get_client_handler()->get_loop();
981   auto &timeoutconf = get_config()->http2.timeout;
982 
983   if (timeoutconf.stream_write != 0.) {
984     reset_timer(loop, &upstream_wtimer_);
985   }
986   if (timeoutconf.stream_read != 0.) {
987     try_reset_timer(loop, &upstream_rtimer_);
988   }
989 }
990 
ensure_upstream_wtimer()991 void Downstream::ensure_upstream_wtimer() {
992   if (get_config()->http2.timeout.stream_write == 0.) {
993     return;
994   }
995   auto loop = upstream_->get_client_handler()->get_loop();
996   ensure_timer(loop, &upstream_wtimer_);
997 }
998 
disable_upstream_rtimer()999 void Downstream::disable_upstream_rtimer() {
1000   if (get_config()->http2.timeout.stream_read == 0.) {
1001     return;
1002   }
1003   auto loop = upstream_->get_client_handler()->get_loop();
1004   disable_timer(loop, &upstream_rtimer_);
1005 }
1006 
disable_upstream_wtimer()1007 void Downstream::disable_upstream_wtimer() {
1008   if (get_config()->http2.timeout.stream_write == 0.) {
1009     return;
1010   }
1011   auto loop = upstream_->get_client_handler()->get_loop();
1012   disable_timer(loop, &upstream_wtimer_);
1013 }
1014 
reset_downstream_rtimer()1015 void Downstream::reset_downstream_rtimer() {
1016   if (get_config()->http2.timeout.stream_read == 0.) {
1017     return;
1018   }
1019   auto loop = upstream_->get_client_handler()->get_loop();
1020   reset_timer(loop, &downstream_rtimer_);
1021 }
1022 
reset_downstream_wtimer()1023 void Downstream::reset_downstream_wtimer() {
1024   auto loop = upstream_->get_client_handler()->get_loop();
1025   auto &timeoutconf = get_config()->http2.timeout;
1026 
1027   if (timeoutconf.stream_write != 0.) {
1028     reset_timer(loop, &downstream_wtimer_);
1029   }
1030   if (timeoutconf.stream_read != 0.) {
1031     try_reset_timer(loop, &downstream_rtimer_);
1032   }
1033 }
1034 
ensure_downstream_wtimer()1035 void Downstream::ensure_downstream_wtimer() {
1036   if (get_config()->http2.timeout.stream_write == 0.) {
1037     return;
1038   }
1039   auto loop = upstream_->get_client_handler()->get_loop();
1040   ensure_timer(loop, &downstream_wtimer_);
1041 }
1042 
disable_downstream_rtimer()1043 void Downstream::disable_downstream_rtimer() {
1044   if (get_config()->http2.timeout.stream_read == 0.) {
1045     return;
1046   }
1047   auto loop = upstream_->get_client_handler()->get_loop();
1048   disable_timer(loop, &downstream_rtimer_);
1049 }
1050 
disable_downstream_wtimer()1051 void Downstream::disable_downstream_wtimer() {
1052   if (get_config()->http2.timeout.stream_write == 0.) {
1053     return;
1054   }
1055   auto loop = upstream_->get_client_handler()->get_loop();
1056   disable_timer(loop, &downstream_wtimer_);
1057 }
1058 
accesslog_ready() const1059 bool Downstream::accesslog_ready() const {
1060   return !accesslog_written_ && resp_.http_status > 0;
1061 }
1062 
add_retry()1063 void Downstream::add_retry() { ++num_retry_; }
1064 
no_more_retry() const1065 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1066 
set_request_downstream_host(const StringRef & host)1067 void Downstream::set_request_downstream_host(const StringRef &host) {
1068   request_downstream_host_ = host;
1069 }
1070 
set_request_pending(bool f)1071 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1072 
get_request_pending() const1073 bool Downstream::get_request_pending() const { return request_pending_; }
1074 
set_request_header_sent(bool f)1075 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1076 
get_request_header_sent() const1077 bool Downstream::get_request_header_sent() const {
1078   return request_header_sent_;
1079 }
1080 
request_submission_ready() const1081 bool Downstream::request_submission_ready() const {
1082   return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1083           request_state_ == DownstreamState::MSG_COMPLETE) &&
1084          (request_pending_ || !request_header_sent_) &&
1085          response_state_ == DownstreamState::INITIAL;
1086 }
1087 
get_dispatch_state() const1088 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1089 
set_dispatch_state(DispatchState s)1090 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1091 
attach_blocked_link(BlockedLink * l)1092 void Downstream::attach_blocked_link(BlockedLink *l) {
1093   assert(!blocked_link_);
1094 
1095   l->downstream = this;
1096   blocked_link_ = l;
1097 }
1098 
detach_blocked_link()1099 BlockedLink *Downstream::detach_blocked_link() {
1100   auto link = blocked_link_;
1101   blocked_link_ = nullptr;
1102   return link;
1103 }
1104 
can_detach_downstream_connection() const1105 bool Downstream::can_detach_downstream_connection() const {
1106   // We should check request and response buffer.  If request buffer
1107   // is not empty, then we might leave downstream connection in weird
1108   // state, especially for HTTP/1.1
1109   return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1110          request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1111          !resp_.connection_close && request_buf_.rleft() == 0;
1112 }
1113 
pop_response_buf()1114 DefaultMemchunks Downstream::pop_response_buf() {
1115   return std::move(response_buf_);
1116 }
1117 
set_assoc_stream_id(int32_t stream_id)1118 void Downstream::set_assoc_stream_id(int32_t stream_id) {
1119   assoc_stream_id_ = stream_id;
1120 }
1121 
get_assoc_stream_id() const1122 int32_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1123 
get_block_allocator()1124 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1125 
add_rcbuf(nghttp2_rcbuf * rcbuf)1126 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1127   nghttp2_rcbuf_incref(rcbuf);
1128   rcbufs_.push_back(rcbuf);
1129 }
1130 
set_downstream_addr_group(const std::shared_ptr<DownstreamAddrGroup> & group)1131 void Downstream::set_downstream_addr_group(
1132     const std::shared_ptr<DownstreamAddrGroup> &group) {
1133   group_ = group;
1134 }
1135 
set_addr(const DownstreamAddr * addr)1136 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1137 
get_addr() const1138 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1139 
set_accesslog_written(bool f)1140 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1141 
renew_affinity_cookie(uint32_t h)1142 void Downstream::renew_affinity_cookie(uint32_t h) {
1143   affinity_cookie_ = h;
1144   new_affinity_cookie_ = true;
1145 }
1146 
get_affinity_cookie_to_send() const1147 uint32_t Downstream::get_affinity_cookie_to_send() const {
1148   if (new_affinity_cookie_) {
1149     return affinity_cookie_;
1150   }
1151   return 0;
1152 }
1153 
get_blocked_request_buf()1154 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1155   return &blocked_request_buf_;
1156 }
1157 
get_blocked_request_data_eof() const1158 bool Downstream::get_blocked_request_data_eof() const {
1159   return blocked_request_data_eof_;
1160 }
1161 
set_blocked_request_data_eof(bool f)1162 void Downstream::set_blocked_request_data_eof(bool f) {
1163   blocked_request_data_eof_ = f;
1164 }
1165 
set_ws_key(const StringRef & key)1166 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1167 
get_expect_100_continue() const1168 bool Downstream::get_expect_100_continue() const {
1169   return expect_100_continue_;
1170 }
1171 
1172 } // namespace shrpx
1173