1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "shrpx_worker.h"
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif // HAVE_UNISTD_H
30 
31 #include <cstdio>
32 #include <memory>
33 
34 #include <openssl/rand.h>
35 
36 #ifdef HAVE_LIBBPF
37 #  include <bpf/bpf.h>
38 #  include <bpf/libbpf.h>
39 #endif // HAVE_LIBBPF
40 
41 #include "shrpx_tls.h"
42 #include "shrpx_log.h"
43 #include "shrpx_client_handler.h"
44 #include "shrpx_http2_session.h"
45 #include "shrpx_log_config.h"
46 #include "shrpx_memcached_dispatcher.h"
47 #ifdef HAVE_MRUBY
48 #  include "shrpx_mruby.h"
49 #endif // HAVE_MRUBY
50 #ifdef ENABLE_HTTP3
51 #  include "shrpx_quic_listener.h"
52 #endif // ENABLE_HTTP3
53 #include "shrpx_connection_handler.h"
54 #include "util.h"
55 #include "template.h"
56 #include "xsi_strerror.h"
57 
58 namespace shrpx {
59 
60 namespace {
eventcb(struct ev_loop * loop,ev_async * w,int revents)61 void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
62   auto worker = static_cast<Worker *>(w->data);
63   worker->process_events();
64 }
65 } // namespace
66 
67 namespace {
mcpool_clear_cb(struct ev_loop * loop,ev_timer * w,int revents)68 void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
69   auto worker = static_cast<Worker *>(w->data);
70   if (worker->get_worker_stat()->num_connections != 0) {
71     return;
72   }
73   auto mcpool = worker->get_mcpool();
74   if (mcpool->freelistsize == mcpool->poolsize) {
75     worker->get_mcpool()->clear();
76   }
77 }
78 } // namespace
79 
80 namespace {
proc_wev_cb(struct ev_loop * loop,ev_timer * w,int revents)81 void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
82   auto worker = static_cast<Worker *>(w->data);
83   worker->process_events();
84 }
85 } // namespace
86 
DownstreamAddrGroup()87 DownstreamAddrGroup::DownstreamAddrGroup() : retired{false} {}
88 
~DownstreamAddrGroup()89 DownstreamAddrGroup::~DownstreamAddrGroup() {}
90 
91 // DownstreamKey is used to index SharedDownstreamAddr in order to
92 // find the same configuration.
93 using DownstreamKey =
94     std::tuple<std::vector<std::tuple<StringRef, StringRef, StringRef, size_t,
95                                       size_t, Proto, uint32_t, uint32_t,
96                                       uint32_t, bool, bool, bool, bool>>,
97                bool, SessionAffinity, StringRef, StringRef,
98                SessionAffinityCookieSecure, int64_t, int64_t, StringRef, bool>;
99 
100 namespace {
101 DownstreamKey
create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> & shared_addr,const StringRef & mruby_file)102 create_downstream_key(const std::shared_ptr<SharedDownstreamAddr> &shared_addr,
103                       const StringRef &mruby_file) {
104   DownstreamKey dkey;
105 
106   auto &addrs = std::get<0>(dkey);
107   addrs.resize(shared_addr->addrs.size());
108   auto p = std::begin(addrs);
109   for (auto &a : shared_addr->addrs) {
110     std::get<0>(*p) = a.host;
111     std::get<1>(*p) = a.sni;
112     std::get<2>(*p) = a.group;
113     std::get<3>(*p) = a.fall;
114     std::get<4>(*p) = a.rise;
115     std::get<5>(*p) = a.proto;
116     std::get<6>(*p) = a.port;
117     std::get<7>(*p) = a.weight;
118     std::get<8>(*p) = a.group_weight;
119     std::get<9>(*p) = a.host_unix;
120     std::get<10>(*p) = a.tls;
121     std::get<11>(*p) = a.dns;
122     std::get<12>(*p) = a.upgrade_scheme;
123     ++p;
124   }
125   std::sort(std::begin(addrs), std::end(addrs));
126 
127   std::get<1>(dkey) = shared_addr->redirect_if_not_tls;
128 
129   auto &affinity = shared_addr->affinity;
130   std::get<2>(dkey) = affinity.type;
131   std::get<3>(dkey) = affinity.cookie.name;
132   std::get<4>(dkey) = affinity.cookie.path;
133   std::get<5>(dkey) = affinity.cookie.secure;
134   auto &timeout = shared_addr->timeout;
135   std::get<6>(dkey) = timeout.read;
136   std::get<7>(dkey) = timeout.write;
137   std::get<8>(dkey) = mruby_file;
138   std::get<9>(dkey) = shared_addr->dnf;
139 
140   return dkey;
141 }
142 } // namespace
143 
Worker(struct ev_loop * loop,SSL_CTX * sv_ssl_ctx,SSL_CTX * cl_ssl_ctx,SSL_CTX * tls_session_cache_memcached_ssl_ctx,tls::CertLookupTree * cert_tree,SSL_CTX * quic_sv_ssl_ctx,tls::CertLookupTree * quic_cert_tree,const uint8_t * cid_prefix,size_t cid_prefixlen,size_t index,const std::shared_ptr<TicketKeys> & ticket_keys,ConnectionHandler * conn_handler,std::shared_ptr<DownstreamConfig> downstreamconf)144 Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
145                SSL_CTX *tls_session_cache_memcached_ssl_ctx,
146                tls::CertLookupTree *cert_tree,
147 #ifdef ENABLE_HTTP3
148                SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
149                const uint8_t *cid_prefix, size_t cid_prefixlen,
150 #  ifdef HAVE_LIBBPF
151                size_t index,
152 #  endif // HAVE_LIBBPF
153 #endif   // ENABLE_HTTP3
154                const std::shared_ptr<TicketKeys> &ticket_keys,
155                ConnectionHandler *conn_handler,
156                std::shared_ptr<DownstreamConfig> downstreamconf)
157     :
158 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
159       index_{index},
160 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
161       randgen_(util::make_mt19937()),
162       worker_stat_{},
163       dns_tracker_(loop),
164 #ifdef ENABLE_HTTP3
165       quic_upstream_addrs_{get_config()->conn.quic_listener.addrs},
166 #endif // ENABLE_HTTP3
167       loop_(loop),
168       sv_ssl_ctx_(sv_ssl_ctx),
169       cl_ssl_ctx_(cl_ssl_ctx),
170       cert_tree_(cert_tree),
171       conn_handler_(conn_handler),
172 #ifdef ENABLE_HTTP3
173       quic_sv_ssl_ctx_{quic_sv_ssl_ctx},
174       quic_cert_tree_{quic_cert_tree},
175       quic_conn_handler_{this},
176 #endif // ENABLE_HTTP3
177       ticket_keys_(ticket_keys),
178       connect_blocker_(
179           std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
180       graceful_shutdown_(false) {
181 #ifdef ENABLE_HTTP3
182   std::copy_n(cid_prefix, cid_prefixlen, std::begin(cid_prefix_));
183 #endif // ENABLE_HTTP3
184 
185   ev_async_init(&w_, eventcb);
186   w_.data = this;
187   ev_async_start(loop_, &w_);
188 
189   ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
190   mcpool_clear_timer_.data = this;
191 
192   ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
193   proc_wev_timer_.data = this;
194 
195   auto &session_cacheconf = get_config()->tls.session_cache;
196 
197   if (!session_cacheconf.memcached.host.empty()) {
198     session_cache_memcached_dispatcher_ = std::make_unique<MemcachedDispatcher>(
199         &session_cacheconf.memcached.addr, loop,
200         tls_session_cache_memcached_ssl_ctx,
201         StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
202   }
203 
204   replace_downstream_config(std::move(downstreamconf));
205 }
206 
207 namespace {
ensure_enqueue_addr(std::priority_queue<WeightGroupEntry,std::vector<WeightGroupEntry>,WeightGroupEntryGreater> & wgpq,WeightGroup * wg,DownstreamAddr * addr)208 void ensure_enqueue_addr(
209     std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
210                         WeightGroupEntryGreater> &wgpq,
211     WeightGroup *wg, DownstreamAddr *addr) {
212   uint32_t cycle;
213   if (!wg->pq.empty()) {
214     auto &top = wg->pq.top();
215     cycle = top.cycle;
216   } else {
217     cycle = 0;
218   }
219 
220   addr->cycle = cycle;
221   addr->pending_penalty = 0;
222   wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
223   addr->queued = true;
224 
225   if (!wg->queued) {
226     if (!wgpq.empty()) {
227       auto &top = wgpq.top();
228       cycle = top.cycle;
229     } else {
230       cycle = 0;
231     }
232 
233     wg->cycle = cycle;
234     wg->pending_penalty = 0;
235     wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
236     wg->queued = true;
237   }
238 }
239 } // namespace
240 
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf)241 void Worker::replace_downstream_config(
242     std::shared_ptr<DownstreamConfig> downstreamconf) {
243   for (auto &g : downstream_addr_groups_) {
244     g->retired = true;
245 
246     auto &shared_addr = g->shared_addr;
247     for (auto &addr : shared_addr->addrs) {
248       addr.dconn_pool->remove_all();
249     }
250   }
251 
252   downstreamconf_ = downstreamconf;
253 
254   // Making a copy is much faster with multiple thread on
255   // backendconfig API call.
256   auto groups = downstreamconf->addr_groups;
257 
258   downstream_addr_groups_ =
259       std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());
260 
261   std::map<DownstreamKey, size_t> addr_groups_indexer;
262 #ifdef HAVE_MRUBY
263   // TODO It is a bit less efficient because
264   // mruby::create_mruby_context returns std::unique_ptr and we cannot
265   // use std::make_shared.
266   std::map<StringRef, std::shared_ptr<mruby::MRubyContext>> shared_mruby_ctxs;
267 #endif // HAVE_MRUBY
268 
269   for (size_t i = 0; i < groups.size(); ++i) {
270     auto &src = groups[i];
271     auto &dst = downstream_addr_groups_[i];
272 
273     dst = std::make_shared<DownstreamAddrGroup>();
274     dst->pattern =
275         ImmutableString{std::begin(src.pattern), std::end(src.pattern)};
276 
277     auto shared_addr = std::make_shared<SharedDownstreamAddr>();
278 
279     shared_addr->addrs.resize(src.addrs.size());
280     shared_addr->affinity.type = src.affinity.type;
281     if (src.affinity.type == SessionAffinity::COOKIE) {
282       shared_addr->affinity.cookie.name =
283           make_string_ref(shared_addr->balloc, src.affinity.cookie.name);
284       if (!src.affinity.cookie.path.empty()) {
285         shared_addr->affinity.cookie.path =
286             make_string_ref(shared_addr->balloc, src.affinity.cookie.path);
287       }
288       shared_addr->affinity.cookie.secure = src.affinity.cookie.secure;
289     }
290     shared_addr->affinity_hash = src.affinity_hash;
291     shared_addr->redirect_if_not_tls = src.redirect_if_not_tls;
292     shared_addr->dnf = src.dnf;
293     shared_addr->timeout.read = src.timeout.read;
294     shared_addr->timeout.write = src.timeout.write;
295 
296     for (size_t j = 0; j < src.addrs.size(); ++j) {
297       auto &src_addr = src.addrs[j];
298       auto &dst_addr = shared_addr->addrs[j];
299 
300       dst_addr.addr = src_addr.addr;
301       dst_addr.host = make_string_ref(shared_addr->balloc, src_addr.host);
302       dst_addr.hostport =
303           make_string_ref(shared_addr->balloc, src_addr.hostport);
304       dst_addr.port = src_addr.port;
305       dst_addr.host_unix = src_addr.host_unix;
306       dst_addr.weight = src_addr.weight;
307       dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
308       dst_addr.group_weight = src_addr.group_weight;
309       dst_addr.proto = src_addr.proto;
310       dst_addr.tls = src_addr.tls;
311       dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
312       dst_addr.fall = src_addr.fall;
313       dst_addr.rise = src_addr.rise;
314       dst_addr.dns = src_addr.dns;
315       dst_addr.upgrade_scheme = src_addr.upgrade_scheme;
316     }
317 
318 #ifdef HAVE_MRUBY
319     auto mruby_ctx_it = shared_mruby_ctxs.find(src.mruby_file);
320     if (mruby_ctx_it == std::end(shared_mruby_ctxs)) {
321       shared_addr->mruby_ctx = mruby::create_mruby_context(src.mruby_file);
322       assert(shared_addr->mruby_ctx);
323       shared_mruby_ctxs.emplace(src.mruby_file, shared_addr->mruby_ctx);
324     } else {
325       shared_addr->mruby_ctx = (*mruby_ctx_it).second;
326     }
327 #endif // HAVE_MRUBY
328 
329     // share the connection if patterns have the same set of backend
330     // addresses.
331 
332     auto dkey = create_downstream_key(shared_addr, src.mruby_file);
333     auto it = addr_groups_indexer.find(dkey);
334 
335     if (it == std::end(addr_groups_indexer)) {
336       std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs),
337                    randgen_);
338 
339       auto shared_addr_ptr = shared_addr.get();
340 
341       for (auto &addr : shared_addr->addrs) {
342         addr.connect_blocker = std::make_unique<ConnectBlocker>(
343             randgen_, loop_, nullptr, [shared_addr_ptr, &addr]() {
344               if (!addr.queued) {
345                 if (!addr.wg) {
346                   return;
347                 }
348                 ensure_enqueue_addr(shared_addr_ptr->pq, addr.wg, &addr);
349               }
350             });
351 
352         addr.live_check = std::make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this,
353                                                       &addr, randgen_);
354       }
355 
356       size_t seq = 0;
357       for (auto &addr : shared_addr->addrs) {
358         addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
359         addr.seq = seq++;
360       }
361 
362       if (shared_addr->affinity.type == SessionAffinity::NONE) {
363         std::map<StringRef, WeightGroup *> wgs;
364         size_t num_wgs = 0;
365         for (auto &addr : shared_addr->addrs) {
366           if (wgs.find(addr.group) == std::end(wgs)) {
367             ++num_wgs;
368             wgs.emplace(addr.group, nullptr);
369           }
370         }
371 
372         shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
373 
374         for (auto &addr : shared_addr->addrs) {
375           auto &wg = wgs[addr.group];
376           if (wg == nullptr) {
377             wg = &shared_addr->wgs[--num_wgs];
378             wg->seq = num_wgs;
379           }
380 
381           wg->weight = addr.group_weight;
382           wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
383           addr.queued = true;
384           addr.wg = wg;
385         }
386 
387         assert(num_wgs == 0);
388 
389         for (auto &kv : wgs) {
390           shared_addr->pq.push(
391               WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
392           kv.second->queued = true;
393         }
394       }
395 
396       dst->shared_addr = shared_addr;
397 
398       addr_groups_indexer.emplace(std::move(dkey), i);
399     } else {
400       auto &g = *(std::begin(downstream_addr_groups_) + (*it).second);
401       if (LOG_ENABLED(INFO)) {
402         LOG(INFO) << dst->pattern << " shares the same backend group with "
403                   << g->pattern;
404       }
405       dst->shared_addr = g->shared_addr;
406     }
407   }
408 }
409 
~Worker()410 Worker::~Worker() {
411   ev_async_stop(loop_, &w_);
412   ev_timer_stop(loop_, &mcpool_clear_timer_);
413   ev_timer_stop(loop_, &proc_wev_timer_);
414 }
415 
schedule_clear_mcpool()416 void Worker::schedule_clear_mcpool() {
417   // libev manual says: "If the watcher is already active nothing will
418   // happen."  Since we don't change any timeout here, we don't have
419   // to worry about querying ev_is_active.
420   ev_timer_start(loop_, &mcpool_clear_timer_);
421 }
422 
wait()423 void Worker::wait() {
424 #ifndef NOTHREADS
425   fut_.get();
426 #endif // !NOTHREADS
427 }
428 
run_async()429 void Worker::run_async() {
430 #ifndef NOTHREADS
431   fut_ = std::async(std::launch::async, [this] {
432     (void)reopen_log_files(get_config()->logging);
433     ev_run(loop_);
434     delete_log_config();
435   });
436 #endif // !NOTHREADS
437 }
438 
send(WorkerEvent event)439 void Worker::send(WorkerEvent event) {
440   {
441     std::lock_guard<std::mutex> g(m_);
442 
443     q_.emplace_back(std::move(event));
444   }
445 
446   ev_async_send(loop_, &w_);
447 }
448 
process_events()449 void Worker::process_events() {
450   WorkerEvent wev;
451   {
452     std::lock_guard<std::mutex> g(m_);
453 
454     // Process event one at a time.  This is important for
455     // WorkerEventType::NEW_CONNECTION event since accepting large
456     // number of new connections at once may delay time to 1st byte
457     // for existing connections.
458 
459     if (q_.empty()) {
460       ev_timer_stop(loop_, &proc_wev_timer_);
461       return;
462     }
463 
464     wev = std::move(q_.front());
465     q_.pop_front();
466   }
467 
468   ev_timer_start(loop_, &proc_wev_timer_);
469 
470   auto config = get_config();
471 
472   auto worker_connections = config->conn.upstream.worker_connections;
473 
474   switch (wev.type) {
475   case WorkerEventType::NEW_CONNECTION: {
476     if (LOG_ENABLED(INFO)) {
477       WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
478                        << ", addrlen=" << wev.client_addrlen;
479     }
480 
481     if (worker_stat_.num_connections >= worker_connections) {
482 
483       if (LOG_ENABLED(INFO)) {
484         WLOG(INFO, this) << "Too many connections >= " << worker_connections;
485       }
486 
487       close(wev.client_fd);
488 
489       break;
490     }
491 
492     auto client_handler =
493         tls::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
494                                wev.client_addrlen, wev.faddr);
495     if (!client_handler) {
496       if (LOG_ENABLED(INFO)) {
497         WLOG(ERROR, this) << "ClientHandler creation failed";
498       }
499       close(wev.client_fd);
500       break;
501     }
502 
503     if (LOG_ENABLED(INFO)) {
504       WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
505     }
506 
507     break;
508   }
509   case WorkerEventType::REOPEN_LOG:
510     WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
511                        << ")";
512 
513     reopen_log_files(config->logging);
514 
515     break;
516   case WorkerEventType::GRACEFUL_SHUTDOWN:
517     WLOG(NOTICE, this) << "Graceful shutdown commencing";
518 
519     graceful_shutdown_ = true;
520 
521     if (worker_stat_.num_connections == 0 &&
522         worker_stat_.num_close_waits == 0) {
523       ev_break(loop_);
524 
525       return;
526     }
527 
528     break;
529   case WorkerEventType::REPLACE_DOWNSTREAM:
530     WLOG(NOTICE, this) << "Replace downstream";
531 
532     replace_downstream_config(wev.downstreamconf);
533 
534     break;
535 #ifdef ENABLE_HTTP3
536   case WorkerEventType::QUIC_PKT_FORWARD: {
537     const UpstreamAddr *faddr;
538 
539     if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
540       faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
541       if (faddr == nullptr) {
542         LOG(ERROR) << "No suitable upstream address found";
543 
544         break;
545       }
546     } else if (quic_upstream_addrs_.size() <=
547                wev.quic_pkt->upstream_addr_index) {
548       LOG(ERROR) << "upstream_addr_index is too large";
549 
550       break;
551     } else {
552       faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
553     }
554 
555     quic_conn_handler_.handle_packet(
556         faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
557         wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
558 
559     break;
560   }
561 #endif // ENABLE_HTTP3
562   default:
563     if (LOG_ENABLED(INFO)) {
564       WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
565     }
566   }
567 }
568 
get_cert_lookup_tree() const569 tls::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
570 
571 #ifdef ENABLE_HTTP3
get_quic_cert_lookup_tree() const572 tls::CertLookupTree *Worker::get_quic_cert_lookup_tree() const {
573   return quic_cert_tree_;
574 }
575 #endif // ENABLE_HTTP3
576 
get_ticket_keys()577 std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
578 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
579   return std::atomic_load_explicit(&ticket_keys_, std::memory_order_acquire);
580 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
581   std::lock_guard<std::mutex> g(ticket_keys_m_);
582   return ticket_keys_;
583 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
584 }
585 
set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys)586 void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
587 #ifdef HAVE_ATOMIC_STD_SHARED_PTR
588   // This is single writer
589   std::atomic_store_explicit(&ticket_keys_, std::move(ticket_keys),
590                              std::memory_order_release);
591 #else  // !HAVE_ATOMIC_STD_SHARED_PTR
592   std::lock_guard<std::mutex> g(ticket_keys_m_);
593   ticket_keys_ = std::move(ticket_keys);
594 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
595 }
596 
get_worker_stat()597 WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
598 
get_loop() const599 struct ev_loop *Worker::get_loop() const {
600   return loop_;
601 }
602 
get_sv_ssl_ctx() const603 SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
604 
get_cl_ssl_ctx() const605 SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
606 
607 #ifdef ENABLE_HTTP3
get_quic_sv_ssl_ctx() const608 SSL_CTX *Worker::get_quic_sv_ssl_ctx() const { return quic_sv_ssl_ctx_; }
609 #endif // ENABLE_HTTP3
610 
set_graceful_shutdown(bool f)611 void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
612 
get_graceful_shutdown() const613 bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
614 
get_mcpool()615 MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
616 
get_session_cache_memcached_dispatcher()617 MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
618   return session_cache_memcached_dispatcher_.get();
619 }
620 
get_randgen()621 std::mt19937 &Worker::get_randgen() { return randgen_; }
622 
623 #ifdef HAVE_MRUBY
create_mruby_context()624 int Worker::create_mruby_context() {
625   mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
626   if (!mruby_ctx_) {
627     return -1;
628   }
629 
630   return 0;
631 }
632 
get_mruby_context() const633 mruby::MRubyContext *Worker::get_mruby_context() const {
634   return mruby_ctx_.get();
635 }
636 #endif // HAVE_MRUBY
637 
638 std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups()639 Worker::get_downstream_addr_groups() {
640   return downstream_addr_groups_;
641 }
642 
get_connect_blocker() const643 ConnectBlocker *Worker::get_connect_blocker() const {
644   return connect_blocker_.get();
645 }
646 
get_downstream_config() const647 const DownstreamConfig *Worker::get_downstream_config() const {
648   return downstreamconf_.get();
649 }
650 
get_connection_handler() const651 ConnectionHandler *Worker::get_connection_handler() const {
652   return conn_handler_;
653 }
654 
655 #ifdef ENABLE_HTTP3
get_quic_connection_handler()656 QUICConnectionHandler *Worker::get_quic_connection_handler() {
657   return &quic_conn_handler_;
658 }
659 #endif // ENABLE_HTTP3
660 
get_dns_tracker()661 DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
662 
663 #ifdef ENABLE_HTTP3
664 #  ifdef HAVE_LIBBPF
should_attach_bpf() const665 bool Worker::should_attach_bpf() const {
666   auto config = get_config();
667   auto &quicconf = config->quic;
668   auto &apiconf = config->api;
669 
670   if (quicconf.bpf.disabled) {
671     return false;
672   }
673 
674   if (!config->single_thread && apiconf.enabled) {
675     return index_ == 1;
676   }
677 
678   return index_ == 0;
679 }
680 
should_update_bpf_map() const681 bool Worker::should_update_bpf_map() const {
682   auto config = get_config();
683   auto &quicconf = config->quic;
684 
685   return !quicconf.bpf.disabled;
686 }
687 
compute_sk_index() const688 uint32_t Worker::compute_sk_index() const {
689   auto config = get_config();
690   auto &apiconf = config->api;
691 
692   if (!config->single_thread && apiconf.enabled) {
693     return index_ - 1;
694   }
695 
696   return index_;
697 }
698 #  endif // HAVE_LIBBPF
699 
setup_quic_server_socket()700 int Worker::setup_quic_server_socket() {
701   size_t n = 0;
702 
703   for (auto &addr : quic_upstream_addrs_) {
704     assert(!addr.host_unix);
705     if (create_quic_server_socket(addr) != 0) {
706       return -1;
707     }
708 
709     // Make sure that each endpoint has a unique address.
710     for (size_t i = 0; i < n; ++i) {
711       const auto &a = quic_upstream_addrs_[i];
712 
713       if (addr.hostport == a.hostport) {
714         LOG(FATAL)
715             << "QUIC frontend endpoint must be unique: a duplicate found for "
716             << addr.hostport;
717 
718         return -1;
719       }
720     }
721 
722     ++n;
723 
724     quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
725   }
726 
727   return 0;
728 }
729 
create_quic_server_socket(UpstreamAddr & faddr)730 int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
731   std::array<char, STRERROR_BUFSIZE> errbuf;
732   int fd = -1;
733   int rv;
734 
735   auto service = util::utos(faddr.port);
736   addrinfo hints{};
737   hints.ai_family = faddr.family;
738   hints.ai_socktype = SOCK_DGRAM;
739   hints.ai_flags = AI_PASSIVE;
740 #  ifdef AI_ADDRCONFIG
741   hints.ai_flags |= AI_ADDRCONFIG;
742 #  endif // AI_ADDRCONFIG
743 
744   auto node =
745       faddr.host == StringRef::from_lit("*") ? nullptr : faddr.host.c_str();
746 
747   addrinfo *res, *rp;
748   rv = getaddrinfo(node, service.c_str(), &hints, &res);
749 #  ifdef AI_ADDRCONFIG
750   if (rv != 0) {
751     // Retry without AI_ADDRCONFIG
752     hints.ai_flags &= ~AI_ADDRCONFIG;
753     rv = getaddrinfo(node, service.c_str(), &hints, &res);
754   }
755 #  endif // AI_ADDRCONFIG
756   if (rv != 0) {
757     LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
758                << " address for " << faddr.host << ", port " << faddr.port
759                << ": " << gai_strerror(rv);
760     return -1;
761   }
762 
763   auto res_d = defer(freeaddrinfo, res);
764 
765   std::array<char, NI_MAXHOST> host;
766 
767   for (rp = res; rp; rp = rp->ai_next) {
768     rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
769                      nullptr, 0, NI_NUMERICHOST);
770     if (rv != 0) {
771       LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
772       continue;
773     }
774 
775 #  ifdef SOCK_NONBLOCK
776     fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
777                 rp->ai_protocol);
778     if (fd == -1) {
779       auto error = errno;
780       LOG(WARN) << "socket() syscall failed: "
781                 << xsi_strerror(error, errbuf.data(), errbuf.size());
782       continue;
783     }
784 #  else  // !SOCK_NONBLOCK
785     fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
786     if (fd == -1) {
787       auto error = errno;
788       LOG(WARN) << "socket() syscall failed: "
789                 << xsi_strerror(error, errbuf.data(), errbuf.size());
790       continue;
791     }
792     util::make_socket_nonblocking(fd);
793     util::make_socket_closeonexec(fd);
794 #  endif // !SOCK_NONBLOCK
795 
796     int val = 1;
797     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
798                    static_cast<socklen_t>(sizeof(val))) == -1) {
799       auto error = errno;
800       LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
801                 << xsi_strerror(error, errbuf.data(), errbuf.size());
802       close(fd);
803       continue;
804     }
805 
806     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
807                    static_cast<socklen_t>(sizeof(val))) == -1) {
808       auto error = errno;
809       LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
810                 << xsi_strerror(error, errbuf.data(), errbuf.size());
811       close(fd);
812       continue;
813     }
814 
815     if (faddr.family == AF_INET6) {
816 #  ifdef IPV6_V6ONLY
817       if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
818                      static_cast<socklen_t>(sizeof(val))) == -1) {
819         auto error = errno;
820         LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
821                   << xsi_strerror(error, errbuf.data(), errbuf.size());
822         close(fd);
823         continue;
824       }
825 #  endif // IPV6_V6ONLY
826 
827       if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
828                      static_cast<socklen_t>(sizeof(val))) == -1) {
829         auto error = errno;
830         LOG(WARN)
831             << "Failed to set IPV6_RECVPKTINFO option to listener socket: "
832             << xsi_strerror(error, errbuf.data(), errbuf.size());
833         close(fd);
834         continue;
835       }
836     } else {
837       if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
838                      static_cast<socklen_t>(sizeof(val))) == -1) {
839         auto error = errno;
840         LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
841                   << xsi_strerror(error, errbuf.data(), errbuf.size());
842         close(fd);
843         continue;
844       }
845     }
846 
847     // TODO Enable ECN
848 
849     if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
850       auto error = errno;
851       LOG(WARN) << "bind() syscall failed: "
852                 << xsi_strerror(error, errbuf.data(), errbuf.size());
853       close(fd);
854       continue;
855     }
856 
857 #  ifdef HAVE_LIBBPF
858     auto config = get_config();
859 
860     auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
861     int err;
862 
863     if (should_attach_bpf()) {
864       auto &bpfconf = config->quic.bpf;
865 
866       auto obj = bpf_object__open_file(bpfconf.prog_file.c_str(), nullptr);
867       err = libbpf_get_error(obj);
868       if (err) {
869         LOG(FATAL) << "Failed to open bpf object file: "
870                    << xsi_strerror(-err, errbuf.data(), errbuf.size());
871         close(fd);
872         return -1;
873       }
874 
875       if (bpf_object__load(obj)) {
876         LOG(FATAL) << "Failed to load bpf object file: "
877                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
878         close(fd);
879         return -1;
880       }
881 
882       auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
883       err = libbpf_get_error(prog);
884       if (err) {
885         LOG(FATAL) << "Failed to find sk_reuseport program: "
886                    << xsi_strerror(-err, errbuf.data(), errbuf.size());
887         close(fd);
888         return -1;
889       }
890 
891       auto &ref = quic_bpf_refs[faddr.index];
892 
893       auto reuseport_array =
894           bpf_object__find_map_by_name(obj, "reuseport_array");
895       err = libbpf_get_error(reuseport_array);
896       if (err) {
897         LOG(FATAL) << "Failed to get reuseport_array: "
898                    << xsi_strerror(-err, errbuf.data(), errbuf.size());
899         close(fd);
900         return -1;
901       }
902 
903       ref.reuseport_array = bpf_map__fd(reuseport_array);
904 
905       auto cid_prefix_map = bpf_object__find_map_by_name(obj, "cid_prefix_map");
906       err = libbpf_get_error(cid_prefix_map);
907       if (err) {
908         LOG(FATAL) << "Failed to get cid_prefix_map: "
909                    << xsi_strerror(-err, errbuf.data(), errbuf.size());
910         close(fd);
911         return -1;
912       }
913 
914       ref.cid_prefix_map = bpf_map__fd(cid_prefix_map);
915 
916       auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
917       err = libbpf_get_error(sk_info);
918       if (err) {
919         LOG(FATAL) << "Failed to get sk_info: "
920                    << xsi_strerror(-err, errbuf.data(), errbuf.size());
921         close(fd);
922         return -1;
923       }
924 
925       constexpr uint32_t zero = 0;
926       uint64_t num_socks = config->num_worker;
927 
928       if (bpf_map_update_elem(bpf_map__fd(sk_info), &zero, &num_socks,
929                               BPF_ANY) != 0) {
930         LOG(FATAL) << "Failed to update sk_info: "
931                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
932         close(fd);
933         return -1;
934       }
935 
936       auto &quicconf = config->quic;
937 
938       constexpr uint32_t key_high_idx = 1;
939       constexpr uint32_t key_low_idx = 2;
940 
941       if (bpf_map_update_elem(bpf_map__fd(sk_info), &key_high_idx,
942                               quicconf.upstream.cid_encryption_key.data(),
943                               BPF_ANY) != 0) {
944         LOG(FATAL) << "Failed to update key_high_idx sk_info: "
945                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
946         close(fd);
947         return -1;
948       }
949 
950       if (bpf_map_update_elem(bpf_map__fd(sk_info), &key_low_idx,
951                               quicconf.upstream.cid_encryption_key.data() + 8,
952                               BPF_ANY) != 0) {
953         LOG(FATAL) << "Failed to update key_low_idx sk_info: "
954                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
955         close(fd);
956         return -1;
957       }
958 
959       auto prog_fd = bpf_program__fd(prog);
960 
961       if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
962                      static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
963         LOG(FATAL) << "Failed to attach bpf program: "
964                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
965         close(fd);
966         return -1;
967       }
968     }
969 
970     if (should_update_bpf_map()) {
971       const auto &ref = quic_bpf_refs[faddr.index];
972       auto sk_index = compute_sk_index();
973 
974       if (bpf_map_update_elem(ref.reuseport_array, &sk_index, &fd,
975                               BPF_NOEXIST) != 0) {
976         LOG(FATAL) << "Failed to update reuseport_array: "
977                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
978         close(fd);
979         return -1;
980       }
981 
982       if (bpf_map_update_elem(ref.cid_prefix_map, cid_prefix_.data(), &sk_index,
983                               BPF_NOEXIST) != 0) {
984         LOG(FATAL) << "Failed to update cid_prefix_map: "
985                    << xsi_strerror(errno, errbuf.data(), errbuf.size());
986         close(fd);
987         return -1;
988       }
989     }
990 #  endif // HAVE_LIBBPF
991 
992     break;
993   }
994 
995   if (!rp) {
996     LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
997                << " socket failed";
998 
999     return -1;
1000   }
1001 
1002   faddr.fd = fd;
1003   faddr.hostport = util::make_http_hostport(mod_config()->balloc,
1004                                             StringRef{host.data()}, faddr.port);
1005 
1006   LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
1007 
1008   return 0;
1009 }
1010 
get_cid_prefix() const1011 const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); }
1012 
set_quic_secret(const std::shared_ptr<QUICSecret> & secret)1013 void Worker::set_quic_secret(const std::shared_ptr<QUICSecret> &secret) {
1014   quic_secret_ = secret;
1015 }
1016 
get_quic_secret() const1017 const std::shared_ptr<QUICSecret> &Worker::get_quic_secret() const {
1018   return quic_secret_;
1019 }
1020 
find_quic_upstream_addr(const Address & local_addr)1021 const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
1022   std::array<char, NI_MAXHOST> host;
1023 
1024   auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
1025                         host.size(), nullptr, 0, NI_NUMERICHOST);
1026   if (rv != 0) {
1027     LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
1028 
1029     return nullptr;
1030   }
1031 
1032   uint16_t port;
1033 
1034   switch (local_addr.su.sa.sa_family) {
1035   case AF_INET:
1036     port = htons(local_addr.su.in.sin_port);
1037 
1038     break;
1039   case AF_INET6:
1040     port = htons(local_addr.su.in6.sin6_port);
1041 
1042     break;
1043   default:
1044     assert(0);
1045   }
1046 
1047   std::array<char, util::max_hostport> hostport_buf;
1048 
1049   auto hostport = util::make_http_hostport(std::begin(hostport_buf),
1050                                            StringRef{host.data()}, port);
1051   const UpstreamAddr *fallback_faddr = nullptr;
1052 
1053   for (auto &faddr : quic_upstream_addrs_) {
1054     if (faddr.hostport == hostport) {
1055       return &faddr;
1056     }
1057 
1058     if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
1059       continue;
1060     }
1061 
1062     if (faddr.port == 443 || faddr.port == 80) {
1063       switch (faddr.family) {
1064       case AF_INET:
1065         if (util::streq(faddr.hostport, StringRef::from_lit("0.0.0.0"))) {
1066           fallback_faddr = &faddr;
1067         }
1068 
1069         break;
1070       case AF_INET6:
1071         if (util::streq(faddr.hostport, StringRef::from_lit("[::]"))) {
1072           fallback_faddr = &faddr;
1073         }
1074 
1075         break;
1076       default:
1077         assert(0);
1078       }
1079     } else {
1080       switch (faddr.family) {
1081       case AF_INET:
1082         if (util::starts_with(faddr.hostport,
1083                               StringRef::from_lit("0.0.0.0:"))) {
1084           fallback_faddr = &faddr;
1085         }
1086 
1087         break;
1088       case AF_INET6:
1089         if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) {
1090           fallback_faddr = &faddr;
1091         }
1092 
1093         break;
1094       default:
1095         assert(0);
1096       }
1097     }
1098   }
1099 
1100   return fallback_faddr;
1101 }
1102 #endif // ENABLE_HTTP3
1103 
1104 namespace {
match_downstream_addr_group_host(const RouterConfig & routerconf,const StringRef & host,const StringRef & path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1105 size_t match_downstream_addr_group_host(
1106     const RouterConfig &routerconf, const StringRef &host,
1107     const StringRef &path,
1108     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1109     size_t catch_all, BlockAllocator &balloc) {
1110 
1111   const auto &router = routerconf.router;
1112   const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
1113   const auto &wildcard_patterns = routerconf.wildcard_patterns;
1114 
1115   if (LOG_ENABLED(INFO)) {
1116     LOG(INFO) << "Perform mapping selection, using host=" << host
1117               << ", path=" << path;
1118   }
1119 
1120   auto group = router.match(host, path);
1121   if (group != -1) {
1122     if (LOG_ENABLED(INFO)) {
1123       LOG(INFO) << "Found pattern with query " << host << path
1124                 << ", matched pattern=" << groups[group]->pattern;
1125     }
1126     return group;
1127   }
1128 
1129   if (!wildcard_patterns.empty() && !host.empty()) {
1130     auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
1131     auto ep =
1132         std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
1133     std::reverse(rev_host_src.base, ep);
1134     auto rev_host = StringRef{rev_host_src.base, ep};
1135 
1136     ssize_t best_group = -1;
1137     const RNode *last_node = nullptr;
1138 
1139     for (;;) {
1140       size_t nread = 0;
1141       auto wcidx =
1142           rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
1143       if (wcidx == -1) {
1144         break;
1145       }
1146 
1147       rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
1148 
1149       auto &wc = wildcard_patterns[wcidx];
1150       auto group = wc.router.match(StringRef{}, path);
1151       if (group != -1) {
1152         // We sorted wildcard_patterns in a way that first match is the
1153         // longest host pattern.
1154         if (LOG_ENABLED(INFO)) {
1155           LOG(INFO) << "Found wildcard pattern with query " << host << path
1156                     << ", matched pattern=" << groups[group]->pattern;
1157         }
1158 
1159         best_group = group;
1160       }
1161     }
1162 
1163     if (best_group != -1) {
1164       return best_group;
1165     }
1166   }
1167 
1168   group = router.match(StringRef::from_lit(""), path);
1169   if (group != -1) {
1170     if (LOG_ENABLED(INFO)) {
1171       LOG(INFO) << "Found pattern with query " << path
1172                 << ", matched pattern=" << groups[group]->pattern;
1173     }
1174     return group;
1175   }
1176 
1177   if (LOG_ENABLED(INFO)) {
1178     LOG(INFO) << "None match.  Use catch-all pattern";
1179   }
1180   return catch_all;
1181 }
1182 } // namespace
1183 
match_downstream_addr_group(const RouterConfig & routerconf,const StringRef & hostport,const StringRef & raw_path,const std::vector<std::shared_ptr<DownstreamAddrGroup>> & groups,size_t catch_all,BlockAllocator & balloc)1184 size_t match_downstream_addr_group(
1185     const RouterConfig &routerconf, const StringRef &hostport,
1186     const StringRef &raw_path,
1187     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
1188     size_t catch_all, BlockAllocator &balloc) {
1189   if (std::find(std::begin(hostport), std::end(hostport), '/') !=
1190       std::end(hostport)) {
1191     // We use '/' specially, and if '/' is included in host, it breaks
1192     // our code.  Select catch-all case.
1193     return catch_all;
1194   }
1195 
1196   auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
1197   auto query = std::find(std::begin(raw_path), fragment, '?');
1198   auto path = StringRef{std::begin(raw_path), query};
1199 
1200   if (path.empty() || path[0] != '/') {
1201     path = StringRef::from_lit("/");
1202   }
1203 
1204   if (hostport.empty()) {
1205     return match_downstream_addr_group_host(routerconf, hostport, path, groups,
1206                                             catch_all, balloc);
1207   }
1208 
1209   StringRef host;
1210   if (hostport[0] == '[') {
1211     // assume this is IPv6 numeric address
1212     auto p = std::find(std::begin(hostport), std::end(hostport), ']');
1213     if (p == std::end(hostport)) {
1214       return catch_all;
1215     }
1216     if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
1217       return catch_all;
1218     }
1219     host = StringRef{std::begin(hostport), p + 1};
1220   } else {
1221     auto p = std::find(std::begin(hostport), std::end(hostport), ':');
1222     if (p == std::begin(hostport)) {
1223       return catch_all;
1224     }
1225     host = StringRef{std::begin(hostport), p};
1226   }
1227 
1228   if (std::find_if(std::begin(host), std::end(host), [](char c) {
1229         return 'A' <= c || c <= 'Z';
1230       }) != std::end(host)) {
1231     auto low_host = make_byte_ref(balloc, host.size() + 1);
1232     auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
1233     *ep = '\0';
1234     util::inp_strlower(low_host.base, ep);
1235     host = StringRef{low_host.base, ep};
1236   }
1237   return match_downstream_addr_group_host(routerconf, host, path, groups,
1238                                           catch_all, balloc);
1239 }
1240 
downstream_failure(DownstreamAddr * addr,const Address * raddr)1241 void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
1242   const auto &connect_blocker = addr->connect_blocker;
1243 
1244   if (connect_blocker->in_offline()) {
1245     return;
1246   }
1247 
1248   connect_blocker->on_failure();
1249 
1250   if (addr->fall == 0) {
1251     return;
1252   }
1253 
1254   auto fail_count = connect_blocker->get_fail_count();
1255 
1256   if (fail_count >= addr->fall) {
1257     if (raddr) {
1258       LOG(WARN) << "Could not connect to " << util::to_numeric_addr(raddr)
1259                 << " " << fail_count
1260                 << " times in a row; considered as offline";
1261     } else {
1262       LOG(WARN) << "Could not connect to " << addr->host << ":" << addr->port
1263                 << " " << fail_count
1264                 << " times in a row; considered as offline";
1265     }
1266 
1267     connect_blocker->offline();
1268 
1269     if (addr->rise) {
1270       addr->live_check->schedule();
1271     }
1272   }
1273 }
1274 
1275 #ifdef ENABLE_HTTP3
create_cid_prefix(uint8_t * cid_prefix)1276 int create_cid_prefix(uint8_t *cid_prefix) {
1277   if (RAND_bytes(cid_prefix, SHRPX_QUIC_CID_PREFIXLEN) != 1) {
1278     return -1;
1279   }
1280 
1281   return 0;
1282 }
1283 #endif // ENABLE_HTTP3
1284 
1285 } // namespace shrpx
1286