1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_downstream.h"
26 
27 #include <cassert>
28 
29 #include "url-parser/url_parser.h"
30 
31 #include "shrpx_upstream.h"
32 #include "shrpx_client_handler.h"
33 #include "shrpx_config.h"
34 #include "shrpx_error.h"
35 #include "shrpx_downstream_connection.h"
36 #include "shrpx_downstream_queue.h"
37 #include "shrpx_worker.h"
38 #include "shrpx_http2_session.h"
39 #include "shrpx_log.h"
40 #ifdef HAVE_MRUBY
41 #  include "shrpx_mruby.h"
42 #endif // HAVE_MRUBY
43 #include "util.h"
44 #include "http2.h"
45 
46 namespace shrpx {
47 
48 namespace {
upstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)49 void upstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
50   auto downstream = static_cast<Downstream *>(w->data);
51   auto upstream = downstream->get_upstream();
52 
53   auto which = revents == EV_READ ? "read" : "write";
54 
55   if (LOG_ENABLED(INFO)) {
56     DLOG(INFO, downstream) << "upstream timeout stream_id="
57                            << downstream->get_stream_id() << " event=" << which;
58   }
59 
60   downstream->disable_upstream_rtimer();
61   downstream->disable_upstream_wtimer();
62 
63   upstream->on_timeout(downstream);
64 }
65 } // namespace
66 
67 namespace {
upstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)68 void upstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
69   upstream_timeoutcb(loop, w, EV_READ);
70 }
71 } // namespace
72 
73 namespace {
upstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)74 void upstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
75   upstream_timeoutcb(loop, w, EV_WRITE);
76 }
77 } // namespace
78 
79 namespace {
downstream_timeoutcb(struct ev_loop * loop,ev_timer * w,int revents)80 void downstream_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
81   auto downstream = static_cast<Downstream *>(w->data);
82 
83   auto which = revents == EV_READ ? "read" : "write";
84 
85   if (LOG_ENABLED(INFO)) {
86     DLOG(INFO, downstream) << "downstream timeout stream_id="
87                            << downstream->get_downstream_stream_id()
88                            << " event=" << which;
89   }
90 
91   downstream->disable_downstream_rtimer();
92   downstream->disable_downstream_wtimer();
93 
94   auto dconn = downstream->get_downstream_connection();
95 
96   if (dconn) {
97     dconn->on_timeout();
98   }
99 }
100 } // namespace
101 
102 namespace {
downstream_rtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)103 void downstream_rtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
104   downstream_timeoutcb(loop, w, EV_READ);
105 }
106 } // namespace
107 
108 namespace {
downstream_wtimeoutcb(struct ev_loop * loop,ev_timer * w,int revents)109 void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
110   downstream_timeoutcb(loop, w, EV_WRITE);
111 }
112 } // namespace
113 
114 // upstream could be nullptr for unittests
Downstream(Upstream * upstream,MemchunkPool * mcpool,int64_t stream_id)115 Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
116                        int64_t stream_id)
117     : dlnext(nullptr),
118       dlprev(nullptr),
119       response_sent_body_length(0),
120       balloc_(1024, 1024),
121       req_(balloc_),
122       resp_(balloc_),
123       request_start_time_(std::chrono::high_resolution_clock::now()),
124       blocked_request_buf_(mcpool),
125       request_buf_(mcpool),
126       response_buf_(mcpool),
127       upstream_(upstream),
128       blocked_link_(nullptr),
129       addr_(nullptr),
130       num_retry_(0),
131       stream_id_(stream_id),
132       assoc_stream_id_(-1),
133       downstream_stream_id_(-1),
134       response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
135       affinity_cookie_(0),
136       request_state_(DownstreamState::INITIAL),
137       response_state_(DownstreamState::INITIAL),
138       dispatch_state_(DispatchState::NONE),
139       upgraded_(false),
140       chunked_request_(false),
141       chunked_response_(false),
142       expect_final_response_(false),
143       request_pending_(false),
144       request_header_sent_(false),
145       accesslog_written_(false),
146       new_affinity_cookie_(false),
147       blocked_request_data_eof_(false),
148       expect_100_continue_(false),
149       stop_reading_(false) {
150 
151   auto &timeoutconf = get_config()->http2.timeout;
152 
153   ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
154                 timeoutconf.stream_read);
155   ev_timer_init(&upstream_wtimer_, &upstream_wtimeoutcb, 0.,
156                 timeoutconf.stream_write);
157   ev_timer_init(&downstream_rtimer_, &downstream_rtimeoutcb, 0.,
158                 timeoutconf.stream_read);
159   ev_timer_init(&downstream_wtimer_, &downstream_wtimeoutcb, 0.,
160                 timeoutconf.stream_write);
161 
162   upstream_rtimer_.data = this;
163   upstream_wtimer_.data = this;
164   downstream_rtimer_.data = this;
165   downstream_wtimer_.data = this;
166 
167   rcbufs_.reserve(32);
168 #ifdef ENABLE_HTTP3
169   rcbufs3_.reserve(32);
170 #endif // ENABLE_HTTP3
171 }
172 
~Downstream()173 Downstream::~Downstream() {
174   if (LOG_ENABLED(INFO)) {
175     DLOG(INFO, this) << "Deleting";
176   }
177 
178   // check nullptr for unittest
179   if (upstream_) {
180     auto loop = upstream_->get_client_handler()->get_loop();
181 
182     ev_timer_stop(loop, &upstream_rtimer_);
183     ev_timer_stop(loop, &upstream_wtimer_);
184     ev_timer_stop(loop, &downstream_rtimer_);
185     ev_timer_stop(loop, &downstream_wtimer_);
186 
187 #ifdef HAVE_MRUBY
188     auto handler = upstream_->get_client_handler();
189     auto worker = handler->get_worker();
190     auto mruby_ctx = worker->get_mruby_context();
191 
192     mruby_ctx->delete_downstream(this);
193 #endif // HAVE_MRUBY
194   }
195 
196 #ifdef HAVE_MRUBY
197   if (dconn_) {
198     const auto &group = dconn_->get_downstream_addr_group();
199     if (group) {
200       const auto &mruby_ctx = group->shared_addr->mruby_ctx;
201       mruby_ctx->delete_downstream(this);
202     }
203   }
204 #endif // HAVE_MRUBY
205 
206   // DownstreamConnection may refer to this object.  Delete it now
207   // explicitly.
208   dconn_.reset();
209 
210 #ifdef ENABLE_HTTP3
211   for (auto rcbuf : rcbufs3_) {
212     nghttp3_rcbuf_decref(rcbuf);
213   }
214 #endif // ENABLE_HTTP3
215 
216   for (auto rcbuf : rcbufs_) {
217     nghttp2_rcbuf_decref(rcbuf);
218   }
219 
220   if (LOG_ENABLED(INFO)) {
221     DLOG(INFO, this) << "Deleted";
222   }
223 }
224 
attach_downstream_connection(std::unique_ptr<DownstreamConnection> dconn)225 int Downstream::attach_downstream_connection(
226     std::unique_ptr<DownstreamConnection> dconn) {
227   if (dconn->attach_downstream(this) != 0) {
228     return -1;
229   }
230 
231   dconn_ = std::move(dconn);
232 
233   return 0;
234 }
235 
detach_downstream_connection()236 void Downstream::detach_downstream_connection() {
237   if (!dconn_) {
238     return;
239   }
240 
241 #ifdef HAVE_MRUBY
242   const auto &group = dconn_->get_downstream_addr_group();
243   if (group) {
244     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
245     mruby_ctx->delete_downstream(this);
246   }
247 #endif // HAVE_MRUBY
248 
249   dconn_->detach_downstream(this);
250 
251   auto handler = dconn_->get_client_handler();
252 
253   handler->pool_downstream_connection(
254       std::unique_ptr<DownstreamConnection>(dconn_.release()));
255 }
256 
get_downstream_connection()257 DownstreamConnection *Downstream::get_downstream_connection() {
258   return dconn_.get();
259 }
260 
pop_downstream_connection()261 std::unique_ptr<DownstreamConnection> Downstream::pop_downstream_connection() {
262 #ifdef HAVE_MRUBY
263   if (!dconn_) {
264     return nullptr;
265   }
266 
267   const auto &group = dconn_->get_downstream_addr_group();
268   if (group) {
269     const auto &mruby_ctx = group->shared_addr->mruby_ctx;
270     mruby_ctx->delete_downstream(this);
271   }
272 #endif // HAVE_MRUBY
273 
274   return std::unique_ptr<DownstreamConnection>(dconn_.release());
275 }
276 
pause_read(IOCtrlReason reason)277 void Downstream::pause_read(IOCtrlReason reason) {
278   if (dconn_) {
279     dconn_->pause_read(reason);
280   }
281 }
282 
resume_read(IOCtrlReason reason,size_t consumed)283 int Downstream::resume_read(IOCtrlReason reason, size_t consumed) {
284   if (dconn_) {
285     return dconn_->resume_read(reason, consumed);
286   }
287 
288   return 0;
289 }
290 
force_resume_read()291 void Downstream::force_resume_read() {
292   if (dconn_) {
293     dconn_->force_resume_read();
294   }
295 }
296 
297 namespace {
298 const HeaderRefs::value_type *
search_header_linear_backwards(const HeaderRefs & headers,const StringRef & name)299 search_header_linear_backwards(const HeaderRefs &headers,
300                                const StringRef &name) {
301   for (auto it = headers.rbegin(); it != headers.rend(); ++it) {
302     auto &kv = *it;
303     if (kv.name == name) {
304       return &kv;
305     }
306   }
307   return nullptr;
308 }
309 } // namespace
310 
assemble_request_cookie()311 StringRef Downstream::assemble_request_cookie() {
312   size_t len = 0;
313 
314   for (auto &kv : req_.fs.headers()) {
315     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
316       continue;
317     }
318 
319     len += kv.value.size() + str_size("; ");
320   }
321 
322   auto iov = make_byte_ref(balloc_, len + 1);
323   auto p = iov.base;
324 
325   for (auto &kv : req_.fs.headers()) {
326     if (kv.token != http2::HD_COOKIE || kv.value.empty()) {
327       continue;
328     }
329 
330     auto end = std::end(kv.value);
331     for (auto it = std::begin(kv.value) + kv.value.size();
332          it != std::begin(kv.value); --it) {
333       auto c = *(it - 1);
334       if (c == ' ' || c == ';') {
335         continue;
336       }
337       end = it;
338       break;
339     }
340 
341     p = std::copy(std::begin(kv.value), end, p);
342     p = util::copy_lit(p, "; ");
343   }
344 
345   // cut trailing "; "
346   if (p - iov.base >= 2) {
347     p -= 2;
348   }
349 
350   return StringRef{iov.base, p};
351 }
352 
find_affinity_cookie(const StringRef & name)353 uint32_t Downstream::find_affinity_cookie(const StringRef &name) {
354   for (auto &kv : req_.fs.headers()) {
355     if (kv.token != http2::HD_COOKIE) {
356       continue;
357     }
358 
359     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
360       if (*it == '\t' || *it == ' ' || *it == ';') {
361         ++it;
362         continue;
363       }
364 
365       auto end = std::find(it, std::end(kv.value), '=');
366       if (end == std::end(kv.value)) {
367         return 0;
368       }
369 
370       if (!util::streq(name, StringRef{it, end})) {
371         it = std::find(it, std::end(kv.value), ';');
372         continue;
373       }
374 
375       it = std::find(end + 1, std::end(kv.value), ';');
376       auto val = StringRef{end + 1, it};
377       if (val.size() != 8) {
378         return 0;
379       }
380       uint32_t h = 0;
381       for (auto c : val) {
382         auto n = util::hex_to_uint(c);
383         if (n == 256) {
384           return 0;
385         }
386         h <<= 4;
387         h += n;
388       }
389       affinity_cookie_ = h;
390       return h;
391     }
392   }
393   return 0;
394 }
395 
count_crumble_request_cookie()396 size_t Downstream::count_crumble_request_cookie() {
397   size_t n = 0;
398   for (auto &kv : req_.fs.headers()) {
399     if (kv.token != http2::HD_COOKIE) {
400       continue;
401     }
402 
403     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
404       if (*it == '\t' || *it == ' ' || *it == ';') {
405         ++it;
406         continue;
407       }
408 
409       it = std::find(it, std::end(kv.value), ';');
410 
411       ++n;
412     }
413   }
414   return n;
415 }
416 
crumble_request_cookie(std::vector<nghttp2_nv> & nva)417 void Downstream::crumble_request_cookie(std::vector<nghttp2_nv> &nva) {
418   for (auto &kv : req_.fs.headers()) {
419     if (kv.token != http2::HD_COOKIE) {
420       continue;
421     }
422 
423     for (auto it = std::begin(kv.value); it != std::end(kv.value);) {
424       if (*it == '\t' || *it == ' ' || *it == ';') {
425         ++it;
426         continue;
427       }
428 
429       auto first = it;
430 
431       it = std::find(it, std::end(kv.value), ';');
432 
433       nva.push_back({(uint8_t *)"cookie", (uint8_t *)first, str_size("cookie"),
434                      (size_t)(it - first),
435                      (uint8_t)(NGHTTP2_NV_FLAG_NO_COPY_NAME |
436                                NGHTTP2_NV_FLAG_NO_COPY_VALUE |
437                                (kv.no_index ? NGHTTP2_NV_FLAG_NO_INDEX : 0))});
438     }
439   }
440 }
441 
442 namespace {
add_header(size_t & sum,HeaderRefs & headers,const StringRef & name,const StringRef & value,bool no_index,int32_t token)443 void add_header(size_t &sum, HeaderRefs &headers, const StringRef &name,
444                 const StringRef &value, bool no_index, int32_t token) {
445   sum += name.size() + value.size();
446   headers.emplace_back(name, value, no_index, token);
447 }
448 } // namespace
449 
450 namespace {
alloc_header_name(BlockAllocator & balloc,const StringRef & name)451 StringRef alloc_header_name(BlockAllocator &balloc, const StringRef &name) {
452   auto iov = make_byte_ref(balloc, name.size() + 1);
453   auto p = iov.base;
454   p = std::copy(std::begin(name), std::end(name), p);
455   util::inp_strlower(iov.base, p);
456   *p = '\0';
457 
458   return StringRef{iov.base, p};
459 }
460 } // namespace
461 
462 namespace {
append_last_header_key(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)463 void append_last_header_key(BlockAllocator &balloc, bool &key_prev, size_t &sum,
464                             HeaderRefs &headers, const char *data, size_t len) {
465   assert(key_prev);
466   sum += len;
467   auto &item = headers.back();
468   auto name =
469       realloc_concat_string_ref(balloc, item.name, StringRef{data, len});
470 
471   auto p = const_cast<uint8_t *>(name.byte());
472   util::inp_strlower(p + name.size() - len, p + name.size());
473 
474   item.name = name;
475   item.token = http2::lookup_token(item.name);
476 }
477 } // namespace
478 
479 namespace {
append_last_header_value(BlockAllocator & balloc,bool & key_prev,size_t & sum,HeaderRefs & headers,const char * data,size_t len)480 void append_last_header_value(BlockAllocator &balloc, bool &key_prev,
481                               size_t &sum, HeaderRefs &headers,
482                               const char *data, size_t len) {
483   key_prev = false;
484   sum += len;
485   auto &item = headers.back();
486   item.value =
487       realloc_concat_string_ref(balloc, item.value, StringRef{data, len});
488 }
489 } // namespace
490 
parse_content_length()491 int FieldStore::parse_content_length() {
492   content_length = -1;
493 
494   for (auto &kv : headers_) {
495     if (kv.token != http2::HD_CONTENT_LENGTH) {
496       continue;
497     }
498 
499     auto len = util::parse_uint(kv.value);
500     if (len == -1) {
501       return -1;
502     }
503     if (content_length != -1) {
504       return -1;
505     }
506     content_length = len;
507   }
508   return 0;
509 }
510 
header(int32_t token) const511 const HeaderRefs::value_type *FieldStore::header(int32_t token) const {
512   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
513     auto &kv = *it;
514     if (kv.token == token) {
515       return &kv;
516     }
517   }
518   return nullptr;
519 }
520 
header(int32_t token)521 HeaderRefs::value_type *FieldStore::header(int32_t token) {
522   for (auto it = headers_.rbegin(); it != headers_.rend(); ++it) {
523     auto &kv = *it;
524     if (kv.token == token) {
525       return &kv;
526     }
527   }
528   return nullptr;
529 }
530 
header(const StringRef & name) const531 const HeaderRefs::value_type *FieldStore::header(const StringRef &name) const {
532   return search_header_linear_backwards(headers_, name);
533 }
534 
add_header_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)535 void FieldStore::add_header_token(const StringRef &name, const StringRef &value,
536                                   bool no_index, int32_t token) {
537   shrpx::add_header(buffer_size_, headers_, name, value, no_index, token);
538 }
539 
alloc_add_header_name(const StringRef & name)540 void FieldStore::alloc_add_header_name(const StringRef &name) {
541   auto name_ref = alloc_header_name(balloc_, name);
542   auto token = http2::lookup_token(name_ref);
543   add_header_token(name_ref, StringRef{}, false, token);
544   header_key_prev_ = true;
545 }
546 
append_last_header_key(const char * data,size_t len)547 void FieldStore::append_last_header_key(const char *data, size_t len) {
548   shrpx::append_last_header_key(balloc_, header_key_prev_, buffer_size_,
549                                 headers_, data, len);
550 }
551 
append_last_header_value(const char * data,size_t len)552 void FieldStore::append_last_header_value(const char *data, size_t len) {
553   shrpx::append_last_header_value(balloc_, header_key_prev_, buffer_size_,
554                                   headers_, data, len);
555 }
556 
clear_headers()557 void FieldStore::clear_headers() {
558   headers_.clear();
559   header_key_prev_ = false;
560 }
561 
add_trailer_token(const StringRef & name,const StringRef & value,bool no_index,int32_t token)562 void FieldStore::add_trailer_token(const StringRef &name,
563                                    const StringRef &value, bool no_index,
564                                    int32_t token) {
565   // Header size limit should be applied to all header and trailer
566   // fields combined.
567   shrpx::add_header(buffer_size_, trailers_, name, value, no_index, token);
568 }
569 
alloc_add_trailer_name(const StringRef & name)570 void FieldStore::alloc_add_trailer_name(const StringRef &name) {
571   auto name_ref = alloc_header_name(balloc_, name);
572   auto token = http2::lookup_token(name_ref);
573   add_trailer_token(name_ref, StringRef{}, false, token);
574   trailer_key_prev_ = true;
575 }
576 
append_last_trailer_key(const char * data,size_t len)577 void FieldStore::append_last_trailer_key(const char *data, size_t len) {
578   shrpx::append_last_header_key(balloc_, trailer_key_prev_, buffer_size_,
579                                 trailers_, data, len);
580 }
581 
append_last_trailer_value(const char * data,size_t len)582 void FieldStore::append_last_trailer_value(const char *data, size_t len) {
583   shrpx::append_last_header_value(balloc_, trailer_key_prev_, buffer_size_,
584                                   trailers_, data, len);
585 }
586 
erase_content_length_and_transfer_encoding()587 void FieldStore::erase_content_length_and_transfer_encoding() {
588   for (auto &kv : headers_) {
589     switch (kv.token) {
590     case http2::HD_CONTENT_LENGTH:
591     case http2::HD_TRANSFER_ENCODING:
592       kv.name = StringRef{};
593       kv.token = -1;
594       break;
595     }
596   }
597 }
598 
set_request_start_time(std::chrono::high_resolution_clock::time_point time)599 void Downstream::set_request_start_time(
600     std::chrono::high_resolution_clock::time_point time) {
601   request_start_time_ = std::move(time);
602 }
603 
604 const std::chrono::high_resolution_clock::time_point &
get_request_start_time() const605 Downstream::get_request_start_time() const {
606   return request_start_time_;
607 }
608 
reset_upstream(Upstream * upstream)609 void Downstream::reset_upstream(Upstream *upstream) {
610   upstream_ = upstream;
611   if (dconn_) {
612     dconn_->on_upstream_change(upstream);
613   }
614 }
615 
get_upstream() const616 Upstream *Downstream::get_upstream() const { return upstream_; }
617 
set_stream_id(int64_t stream_id)618 void Downstream::set_stream_id(int64_t stream_id) { stream_id_ = stream_id; }
619 
get_stream_id() const620 int64_t Downstream::get_stream_id() const { return stream_id_; }
621 
set_request_state(DownstreamState state)622 void Downstream::set_request_state(DownstreamState state) {
623   request_state_ = state;
624 }
625 
get_request_state() const626 DownstreamState Downstream::get_request_state() const { return request_state_; }
627 
get_chunked_request() const628 bool Downstream::get_chunked_request() const { return chunked_request_; }
629 
set_chunked_request(bool f)630 void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
631 
request_buf_full()632 bool Downstream::request_buf_full() {
633   auto handler = upstream_->get_client_handler();
634   auto faddr = handler->get_upstream_addr();
635   auto worker = handler->get_worker();
636 
637   // We don't check buffer size here for API endpoint.
638   if (faddr->alt_mode == UpstreamAltMode::API) {
639     return false;
640   }
641 
642   if (dconn_) {
643     auto &downstreamconf = *worker->get_downstream_config();
644     return blocked_request_buf_.rleft() + request_buf_.rleft() >=
645            downstreamconf.request_buffer_size;
646   }
647 
648   return false;
649 }
650 
get_request_buf()651 DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
652 
653 // Call this function after this object is attached to
654 // Downstream. Otherwise, the program will crash.
push_request_headers()655 int Downstream::push_request_headers() {
656   if (!dconn_) {
657     DLOG(INFO, this) << "dconn_ is NULL";
658     return -1;
659   }
660   return dconn_->push_request_headers();
661 }
662 
push_upload_data_chunk(const uint8_t * data,size_t datalen)663 int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
664   req_.recv_body_length += datalen;
665 
666   if (!dconn_ && !request_header_sent_) {
667     blocked_request_buf_.append(data, datalen);
668     req_.unconsumed_body_length += datalen;
669     return 0;
670   }
671 
672   // Assumes that request headers have already been pushed to output
673   // buffer using push_request_headers().
674   if (!dconn_) {
675     DLOG(INFO, this) << "dconn_ is NULL";
676     return -1;
677   }
678   if (dconn_->push_upload_data_chunk(data, datalen) != 0) {
679     return -1;
680   }
681 
682   req_.unconsumed_body_length += datalen;
683 
684   return 0;
685 }
686 
end_upload_data()687 int Downstream::end_upload_data() {
688   if (!dconn_ && !request_header_sent_) {
689     blocked_request_data_eof_ = true;
690     return 0;
691   }
692   if (!dconn_) {
693     DLOG(INFO, this) << "dconn_ is NULL";
694     return -1;
695   }
696   return dconn_->end_upload_data();
697 }
698 
rewrite_location_response_header(const StringRef & upstream_scheme)699 void Downstream::rewrite_location_response_header(
700     const StringRef &upstream_scheme) {
701   auto hd = resp_.fs.header(http2::HD_LOCATION);
702   if (!hd) {
703     return;
704   }
705 
706   if (request_downstream_host_.empty() || req_.authority.empty()) {
707     return;
708   }
709 
710   http_parser_url u{};
711   auto rv = http_parser_parse_url(hd->value.c_str(), hd->value.size(), 0, &u);
712   if (rv != 0) {
713     return;
714   }
715 
716   auto new_uri = http2::rewrite_location_uri(balloc_, hd->value, u,
717                                              request_downstream_host_,
718                                              req_.authority, upstream_scheme);
719 
720   if (new_uri.empty()) {
721     return;
722   }
723 
724   hd->value = new_uri;
725 }
726 
get_chunked_response() const727 bool Downstream::get_chunked_response() const { return chunked_response_; }
728 
set_chunked_response(bool f)729 void Downstream::set_chunked_response(bool f) { chunked_response_ = f; }
730 
on_read()731 int Downstream::on_read() {
732   if (!dconn_) {
733     DLOG(INFO, this) << "dconn_ is NULL";
734     return -1;
735   }
736   return dconn_->on_read();
737 }
738 
set_response_state(DownstreamState state)739 void Downstream::set_response_state(DownstreamState state) {
740   response_state_ = state;
741 }
742 
get_response_state() const743 DownstreamState Downstream::get_response_state() const {
744   return response_state_;
745 }
746 
get_response_buf()747 DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
748 
response_buf_full()749 bool Downstream::response_buf_full() {
750   if (dconn_) {
751     auto handler = upstream_->get_client_handler();
752     auto worker = handler->get_worker();
753     auto &downstreamconf = *worker->get_downstream_config();
754 
755     return response_buf_.rleft() >= downstreamconf.response_buffer_size;
756   }
757 
758   return false;
759 }
760 
validate_request_recv_body_length() const761 bool Downstream::validate_request_recv_body_length() const {
762   if (req_.fs.content_length == -1) {
763     return true;
764   }
765 
766   if (req_.fs.content_length != req_.recv_body_length) {
767     if (LOG_ENABLED(INFO)) {
768       DLOG(INFO, this) << "request invalid bodylen: content-length="
769                        << req_.fs.content_length
770                        << ", received=" << req_.recv_body_length;
771     }
772     return false;
773   }
774 
775   return true;
776 }
777 
validate_response_recv_body_length() const778 bool Downstream::validate_response_recv_body_length() const {
779   if (!expect_response_body() || resp_.fs.content_length == -1) {
780     return true;
781   }
782 
783   if (resp_.fs.content_length != resp_.recv_body_length) {
784     if (LOG_ENABLED(INFO)) {
785       DLOG(INFO, this) << "response invalid bodylen: content-length="
786                        << resp_.fs.content_length
787                        << ", received=" << resp_.recv_body_length;
788     }
789     return false;
790   }
791 
792   return true;
793 }
794 
check_upgrade_fulfilled_http2()795 void Downstream::check_upgrade_fulfilled_http2() {
796   // This handles nonzero req_.connect_proto and h1 frontend requests
797   // WebSocket upgrade.
798   upgraded_ = (req_.method == HTTP_CONNECT ||
799                req_.connect_proto == ConnectProto::WEBSOCKET) &&
800               resp_.http_status / 100 == 2;
801 }
802 
check_upgrade_fulfilled_http1()803 void Downstream::check_upgrade_fulfilled_http1() {
804   if (req_.method == HTTP_CONNECT) {
805     if (req_.connect_proto == ConnectProto::WEBSOCKET) {
806       if (resp_.http_status != 101) {
807         return;
808       }
809 
810       // This is done for HTTP/2 frontend only.
811       auto accept = resp_.fs.header(http2::HD_SEC_WEBSOCKET_ACCEPT);
812       if (!accept) {
813         return;
814       }
815 
816       std::array<uint8_t, base64::encode_length(20)> accept_buf;
817       auto expected =
818           http2::make_websocket_accept_token(accept_buf.data(), ws_key_);
819 
820       upgraded_ = expected != "" && expected == accept->value;
821     } else {
822       upgraded_ = resp_.http_status / 100 == 2;
823     }
824 
825     return;
826   }
827 
828   if (resp_.http_status == 101) {
829     // TODO Do more strict checking for upgrade headers
830     upgraded_ = req_.upgrade_request;
831 
832     return;
833   }
834 }
835 
inspect_http2_request()836 void Downstream::inspect_http2_request() {
837   if (req_.method == HTTP_CONNECT) {
838     req_.upgrade_request = true;
839   }
840 }
841 
inspect_http1_request()842 void Downstream::inspect_http1_request() {
843   if (req_.method == HTTP_CONNECT) {
844     req_.upgrade_request = true;
845   } else if (req_.http_minor > 0) {
846     auto upgrade = req_.fs.header(http2::HD_UPGRADE);
847     if (upgrade) {
848       const auto &val = upgrade->value;
849       // TODO Perform more strict checking for upgrade headers
850       if (util::streq_l(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID, val.c_str(),
851                         val.size())) {
852         req_.http2_upgrade_seen = true;
853       } else {
854         req_.upgrade_request = true;
855 
856         // TODO Should we check Sec-WebSocket-Key, and
857         // Sec-WebSocket-Version as well?
858         if (util::strieq_l("websocket", val)) {
859           req_.connect_proto = ConnectProto::WEBSOCKET;
860         }
861       }
862     }
863   }
864   auto transfer_encoding = req_.fs.header(http2::HD_TRANSFER_ENCODING);
865   if (transfer_encoding) {
866     req_.fs.content_length = -1;
867     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
868       chunked_request_ = true;
869     }
870   }
871 
872   auto expect = req_.fs.header(http2::HD_EXPECT);
873   expect_100_continue_ =
874       expect &&
875       util::strieq(expect->value, StringRef::from_lit("100-continue"));
876 }
877 
inspect_http1_response()878 void Downstream::inspect_http1_response() {
879   auto transfer_encoding = resp_.fs.header(http2::HD_TRANSFER_ENCODING);
880   if (transfer_encoding) {
881     resp_.fs.content_length = -1;
882     if (util::iends_with_l(transfer_encoding->value, "chunked")) {
883       chunked_response_ = true;
884     }
885   }
886 }
887 
reset_response()888 void Downstream::reset_response() {
889   resp_.http_status = 0;
890   resp_.http_major = 1;
891   resp_.http_minor = 1;
892 }
893 
get_non_final_response() const894 bool Downstream::get_non_final_response() const {
895   return !upgraded_ && resp_.http_status / 100 == 1;
896 }
897 
supports_non_final_response() const898 bool Downstream::supports_non_final_response() const {
899   return req_.http_major == 3 || req_.http_major == 2 ||
900          (req_.http_major == 1 && req_.http_minor == 1);
901 }
902 
get_upgraded() const903 bool Downstream::get_upgraded() const { return upgraded_; }
904 
get_http2_upgrade_request() const905 bool Downstream::get_http2_upgrade_request() const {
906   return req_.http2_upgrade_seen && req_.fs.header(http2::HD_HTTP2_SETTINGS) &&
907          response_state_ == DownstreamState::INITIAL;
908 }
909 
get_http2_settings() const910 StringRef Downstream::get_http2_settings() const {
911   auto http2_settings = req_.fs.header(http2::HD_HTTP2_SETTINGS);
912   if (!http2_settings) {
913     return StringRef{};
914   }
915   return http2_settings->value;
916 }
917 
set_downstream_stream_id(int64_t stream_id)918 void Downstream::set_downstream_stream_id(int64_t stream_id) {
919   downstream_stream_id_ = stream_id;
920 }
921 
get_downstream_stream_id() const922 int64_t Downstream::get_downstream_stream_id() const {
923   return downstream_stream_id_;
924 }
925 
get_response_rst_stream_error_code() const926 uint32_t Downstream::get_response_rst_stream_error_code() const {
927   return response_rst_stream_error_code_;
928 }
929 
set_response_rst_stream_error_code(uint32_t error_code)930 void Downstream::set_response_rst_stream_error_code(uint32_t error_code) {
931   response_rst_stream_error_code_ = error_code;
932 }
933 
set_expect_final_response(bool f)934 void Downstream::set_expect_final_response(bool f) {
935   expect_final_response_ = f;
936 }
937 
get_expect_final_response() const938 bool Downstream::get_expect_final_response() const {
939   return expect_final_response_;
940 }
941 
expect_response_body() const942 bool Downstream::expect_response_body() const {
943   return !resp_.headers_only &&
944          http2::expect_response_body(req_.method, resp_.http_status);
945 }
946 
expect_response_trailer() const947 bool Downstream::expect_response_trailer() const {
948   // In HTTP/2, if final response HEADERS does not bear END_STREAM it
949   // is possible trailer fields might come, regardless of request
950   // method or status code.
951   return !resp_.headers_only &&
952          (resp_.http_major == 3 || resp_.http_major == 2);
953 }
954 
955 namespace {
reset_timer(struct ev_loop * loop,ev_timer * w)956 void reset_timer(struct ev_loop *loop, ev_timer *w) { ev_timer_again(loop, w); }
957 } // namespace
958 
959 namespace {
try_reset_timer(struct ev_loop * loop,ev_timer * w)960 void try_reset_timer(struct ev_loop *loop, ev_timer *w) {
961   if (!ev_is_active(w)) {
962     return;
963   }
964   ev_timer_again(loop, w);
965 }
966 } // namespace
967 
968 namespace {
ensure_timer(struct ev_loop * loop,ev_timer * w)969 void ensure_timer(struct ev_loop *loop, ev_timer *w) {
970   if (ev_is_active(w)) {
971     return;
972   }
973   ev_timer_again(loop, w);
974 }
975 } // namespace
976 
977 namespace {
disable_timer(struct ev_loop * loop,ev_timer * w)978 void disable_timer(struct ev_loop *loop, ev_timer *w) {
979   ev_timer_stop(loop, w);
980 }
981 } // namespace
982 
reset_upstream_rtimer()983 void Downstream::reset_upstream_rtimer() {
984   if (get_config()->http2.timeout.stream_read == 0.) {
985     return;
986   }
987   auto loop = upstream_->get_client_handler()->get_loop();
988   reset_timer(loop, &upstream_rtimer_);
989 }
990 
reset_upstream_wtimer()991 void Downstream::reset_upstream_wtimer() {
992   auto loop = upstream_->get_client_handler()->get_loop();
993   auto &timeoutconf = get_config()->http2.timeout;
994 
995   if (timeoutconf.stream_write != 0.) {
996     reset_timer(loop, &upstream_wtimer_);
997   }
998   if (timeoutconf.stream_read != 0.) {
999     try_reset_timer(loop, &upstream_rtimer_);
1000   }
1001 }
1002 
ensure_upstream_wtimer()1003 void Downstream::ensure_upstream_wtimer() {
1004   if (get_config()->http2.timeout.stream_write == 0.) {
1005     return;
1006   }
1007   auto loop = upstream_->get_client_handler()->get_loop();
1008   ensure_timer(loop, &upstream_wtimer_);
1009 }
1010 
disable_upstream_rtimer()1011 void Downstream::disable_upstream_rtimer() {
1012   if (get_config()->http2.timeout.stream_read == 0.) {
1013     return;
1014   }
1015   auto loop = upstream_->get_client_handler()->get_loop();
1016   disable_timer(loop, &upstream_rtimer_);
1017 }
1018 
disable_upstream_wtimer()1019 void Downstream::disable_upstream_wtimer() {
1020   if (get_config()->http2.timeout.stream_write == 0.) {
1021     return;
1022   }
1023   auto loop = upstream_->get_client_handler()->get_loop();
1024   disable_timer(loop, &upstream_wtimer_);
1025 }
1026 
reset_downstream_rtimer()1027 void Downstream::reset_downstream_rtimer() {
1028   if (get_config()->http2.timeout.stream_read == 0.) {
1029     return;
1030   }
1031   auto loop = upstream_->get_client_handler()->get_loop();
1032   reset_timer(loop, &downstream_rtimer_);
1033 }
1034 
reset_downstream_wtimer()1035 void Downstream::reset_downstream_wtimer() {
1036   auto loop = upstream_->get_client_handler()->get_loop();
1037   auto &timeoutconf = get_config()->http2.timeout;
1038 
1039   if (timeoutconf.stream_write != 0.) {
1040     reset_timer(loop, &downstream_wtimer_);
1041   }
1042   if (timeoutconf.stream_read != 0.) {
1043     try_reset_timer(loop, &downstream_rtimer_);
1044   }
1045 }
1046 
ensure_downstream_wtimer()1047 void Downstream::ensure_downstream_wtimer() {
1048   if (get_config()->http2.timeout.stream_write == 0.) {
1049     return;
1050   }
1051   auto loop = upstream_->get_client_handler()->get_loop();
1052   ensure_timer(loop, &downstream_wtimer_);
1053 }
1054 
disable_downstream_rtimer()1055 void Downstream::disable_downstream_rtimer() {
1056   if (get_config()->http2.timeout.stream_read == 0.) {
1057     return;
1058   }
1059   auto loop = upstream_->get_client_handler()->get_loop();
1060   disable_timer(loop, &downstream_rtimer_);
1061 }
1062 
disable_downstream_wtimer()1063 void Downstream::disable_downstream_wtimer() {
1064   if (get_config()->http2.timeout.stream_write == 0.) {
1065     return;
1066   }
1067   auto loop = upstream_->get_client_handler()->get_loop();
1068   disable_timer(loop, &downstream_wtimer_);
1069 }
1070 
accesslog_ready() const1071 bool Downstream::accesslog_ready() const {
1072   return !accesslog_written_ && resp_.http_status > 0;
1073 }
1074 
add_retry()1075 void Downstream::add_retry() { ++num_retry_; }
1076 
no_more_retry() const1077 bool Downstream::no_more_retry() const { return num_retry_ > 50; }
1078 
set_request_downstream_host(const StringRef & host)1079 void Downstream::set_request_downstream_host(const StringRef &host) {
1080   request_downstream_host_ = host;
1081 }
1082 
set_request_pending(bool f)1083 void Downstream::set_request_pending(bool f) { request_pending_ = f; }
1084 
get_request_pending() const1085 bool Downstream::get_request_pending() const { return request_pending_; }
1086 
set_request_header_sent(bool f)1087 void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
1088 
get_request_header_sent() const1089 bool Downstream::get_request_header_sent() const {
1090   return request_header_sent_;
1091 }
1092 
request_submission_ready() const1093 bool Downstream::request_submission_ready() const {
1094   return (request_state_ == DownstreamState::HEADER_COMPLETE ||
1095           request_state_ == DownstreamState::MSG_COMPLETE) &&
1096          (request_pending_ || !request_header_sent_) &&
1097          response_state_ == DownstreamState::INITIAL;
1098 }
1099 
get_dispatch_state() const1100 DispatchState Downstream::get_dispatch_state() const { return dispatch_state_; }
1101 
set_dispatch_state(DispatchState s)1102 void Downstream::set_dispatch_state(DispatchState s) { dispatch_state_ = s; }
1103 
attach_blocked_link(BlockedLink * l)1104 void Downstream::attach_blocked_link(BlockedLink *l) {
1105   assert(!blocked_link_);
1106 
1107   l->downstream = this;
1108   blocked_link_ = l;
1109 }
1110 
detach_blocked_link()1111 BlockedLink *Downstream::detach_blocked_link() {
1112   auto link = blocked_link_;
1113   blocked_link_ = nullptr;
1114   return link;
1115 }
1116 
can_detach_downstream_connection() const1117 bool Downstream::can_detach_downstream_connection() const {
1118   // We should check request and response buffer.  If request buffer
1119   // is not empty, then we might leave downstream connection in weird
1120   // state, especially for HTTP/1.1
1121   return dconn_ && response_state_ == DownstreamState::MSG_COMPLETE &&
1122          request_state_ == DownstreamState::MSG_COMPLETE && !upgraded_ &&
1123          !resp_.connection_close && request_buf_.rleft() == 0;
1124 }
1125 
pop_response_buf()1126 DefaultMemchunks Downstream::pop_response_buf() {
1127   return std::move(response_buf_);
1128 }
1129 
set_assoc_stream_id(int64_t stream_id)1130 void Downstream::set_assoc_stream_id(int64_t stream_id) {
1131   assoc_stream_id_ = stream_id;
1132 }
1133 
get_assoc_stream_id() const1134 int64_t Downstream::get_assoc_stream_id() const { return assoc_stream_id_; }
1135 
get_block_allocator()1136 BlockAllocator &Downstream::get_block_allocator() { return balloc_; }
1137 
add_rcbuf(nghttp2_rcbuf * rcbuf)1138 void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) {
1139   nghttp2_rcbuf_incref(rcbuf);
1140   rcbufs_.push_back(rcbuf);
1141 }
1142 
1143 #ifdef ENABLE_HTTP3
add_rcbuf(nghttp3_rcbuf * rcbuf)1144 void Downstream::add_rcbuf(nghttp3_rcbuf *rcbuf) {
1145   nghttp3_rcbuf_incref(rcbuf);
1146   rcbufs3_.push_back(rcbuf);
1147 }
1148 #endif // ENABLE_HTTP3
1149 
set_downstream_addr_group(const std::shared_ptr<DownstreamAddrGroup> & group)1150 void Downstream::set_downstream_addr_group(
1151     const std::shared_ptr<DownstreamAddrGroup> &group) {
1152   group_ = group;
1153 }
1154 
set_addr(const DownstreamAddr * addr)1155 void Downstream::set_addr(const DownstreamAddr *addr) { addr_ = addr; }
1156 
get_addr() const1157 const DownstreamAddr *Downstream::get_addr() const { return addr_; }
1158 
set_accesslog_written(bool f)1159 void Downstream::set_accesslog_written(bool f) { accesslog_written_ = f; }
1160 
renew_affinity_cookie(uint32_t h)1161 void Downstream::renew_affinity_cookie(uint32_t h) {
1162   affinity_cookie_ = h;
1163   new_affinity_cookie_ = true;
1164 }
1165 
get_affinity_cookie_to_send() const1166 uint32_t Downstream::get_affinity_cookie_to_send() const {
1167   if (new_affinity_cookie_) {
1168     return affinity_cookie_;
1169   }
1170   return 0;
1171 }
1172 
get_blocked_request_buf()1173 DefaultMemchunks *Downstream::get_blocked_request_buf() {
1174   return &blocked_request_buf_;
1175 }
1176 
get_blocked_request_data_eof() const1177 bool Downstream::get_blocked_request_data_eof() const {
1178   return blocked_request_data_eof_;
1179 }
1180 
set_blocked_request_data_eof(bool f)1181 void Downstream::set_blocked_request_data_eof(bool f) {
1182   blocked_request_data_eof_ = f;
1183 }
1184 
set_ws_key(const StringRef & key)1185 void Downstream::set_ws_key(const StringRef &key) { ws_key_ = key; }
1186 
get_expect_100_continue() const1187 bool Downstream::get_expect_100_continue() const {
1188   return expect_100_continue_;
1189 }
1190 
get_stop_reading() const1191 bool Downstream::get_stop_reading() const { return stop_reading_; }
1192 
set_stop_reading(bool f)1193 void Downstream::set_stop_reading(bool f) { stop_reading_ = f; }
1194 
1195 } // namespace shrpx
1196