1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2012 Tatsuhiro Tsujikawa
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #ifndef SHRPX_WORKER_H
26 #define SHRPX_WORKER_H
27 
28 #include "shrpx.h"
29 
30 #include <mutex>
31 #include <vector>
32 #include <random>
33 #include <unordered_map>
34 #include <deque>
35 #include <thread>
36 #include <queue>
37 #ifndef NOTHREADS
38 #  include <future>
39 #endif // NOTHREADS
40 
41 #include <openssl/ssl.h>
42 #include <openssl/err.h>
43 
44 #include <ev.h>
45 
46 #include "shrpx_config.h"
47 #include "shrpx_downstream_connection_pool.h"
48 #include "memchunk.h"
49 #include "shrpx_tls.h"
50 #include "shrpx_live_check.h"
51 #include "shrpx_connect_blocker.h"
52 #include "shrpx_dns_tracker.h"
53 #ifdef ENABLE_HTTP3
54 #  include "shrpx_quic_connection_handler.h"
55 #  include "shrpx_quic.h"
56 #endif // ENABLE_HTTP3
57 #include "allocator.h"
58 
59 using namespace nghttp2;
60 
61 namespace shrpx {
62 
63 class Http2Session;
64 class ConnectBlocker;
65 class MemcachedDispatcher;
66 struct UpstreamAddr;
67 class ConnectionHandler;
68 #ifdef ENABLE_HTTP3
69 class QUICListener;
70 #endif // ENABLE_HTTP3
71 
72 #ifdef HAVE_MRUBY
73 namespace mruby {
74 
75 class MRubyContext;
76 
77 } // namespace mruby
78 #endif // HAVE_MRUBY
79 
80 namespace tls {
81 class CertLookupTree;
82 } // namespace tls
83 
84 struct WeightGroup;
85 
86 struct DownstreamAddr {
87   Address addr;
88   // backend address.  If |host_unix| is true, this is UNIX domain
89   // socket path.
90   StringRef host;
91   StringRef hostport;
92   // backend port.  0 if |host_unix| is true.
93   uint16_t port;
94   // true if |host| contains UNIX domain socket path.
95   bool host_unix;
96 
97   // sni field to send remote server if TLS is enabled.
98   StringRef sni;
99 
100   std::unique_ptr<ConnectBlocker> connect_blocker;
101   std::unique_ptr<LiveCheck> live_check;
102   // Connection pool for this particular address if session affinity
103   // is enabled
104   std::unique_ptr<DownstreamConnectionPool> dconn_pool;
105   size_t fall;
106   size_t rise;
107   // Client side TLS session cache
108   tls::TLSSessionCache tls_session_cache;
109   // List of Http2Session which is not fully utilized (i.e., the
110   // server advertised maximum concurrency is not reached).  We will
111   // coalesce as much stream as possible in one Http2Session to fully
112   // utilize TCP connection.
113   DList<Http2Session> http2_extra_freelist;
114   WeightGroup *wg;
115   // total number of streams created in HTTP/2 connections for this
116   // address.
117   size_t num_dconn;
118   // the sequence number of this address to randomize the order access
119   // threads.
120   size_t seq;
121   // Application protocol used in this backend
122   Proto proto;
123   // cycle is used to prioritize this address.  Lower value takes
124   // higher priority.
125   uint32_t cycle;
126   // penalty which is applied to the next cycle calculation.
127   uint32_t pending_penalty;
128   // Weight of this address inside a weight group.  Its range is [1,
129   // 256], inclusive.
130   uint32_t weight;
131   // name of group which this address belongs to.
132   StringRef group;
133   // Weight of the weight group which this address belongs to.  Its
134   // range is [1, 256], inclusive.
135   uint32_t group_weight;
136   // true if TLS is used in this backend
137   bool tls;
138   // true if dynamic DNS is enabled
139   bool dns;
140   // true if :scheme pseudo header field should be upgraded to secure
141   // variant (e.g., "https") when forwarding request to a backend
142   // connected by TLS connection.
143   bool upgrade_scheme;
144   // true if this address is queued.
145   bool queued;
146 };
147 
148 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
149 
150 struct DownstreamAddrEntry {
151   DownstreamAddr *addr;
152   size_t seq;
153   uint32_t cycle;
154 };
155 
156 struct DownstreamAddrEntryGreater {
operatorDownstreamAddrEntryGreater157   bool operator()(const DownstreamAddrEntry &lhs,
158                   const DownstreamAddrEntry &rhs) const {
159     auto d = lhs.cycle - rhs.cycle;
160     if (d == 0) {
161       return rhs.seq < lhs.seq;
162     }
163     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
164   }
165 };
166 
167 struct WeightGroup {
168   std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
169                       DownstreamAddrEntryGreater>
170       pq;
171   size_t seq;
172   uint32_t weight;
173   uint32_t cycle;
174   uint32_t pending_penalty;
175   // true if this object is queued.
176   bool queued;
177 };
178 
179 struct WeightGroupEntry {
180   WeightGroup *wg;
181   size_t seq;
182   uint32_t cycle;
183 };
184 
185 struct WeightGroupEntryGreater {
operatorWeightGroupEntryGreater186   bool operator()(const WeightGroupEntry &lhs,
187                   const WeightGroupEntry &rhs) const {
188     auto d = lhs.cycle - rhs.cycle;
189     if (d == 0) {
190       return rhs.seq < lhs.seq;
191     }
192     return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
193   }
194 };
195 
196 struct SharedDownstreamAddr {
SharedDownstreamAddrSharedDownstreamAddr197   SharedDownstreamAddr()
198       : balloc(1024, 1024),
199         affinity{SessionAffinity::NONE},
200         redirect_if_not_tls{false},
201         dnf{false},
202         timeout{} {}
203 
204   SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
205   SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
206   SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
207   SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
208 
209   BlockAllocator balloc;
210   std::vector<DownstreamAddr> addrs;
211   std::vector<WeightGroup> wgs;
212   std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
213                       WeightGroupEntryGreater>
214       pq;
215   // Bunch of session affinity hash.  Only used if affinity ==
216   // SessionAffinity::IP.
217   std::vector<AffinityHash> affinity_hash;
218 #ifdef HAVE_MRUBY
219   std::shared_ptr<mruby::MRubyContext> mruby_ctx;
220 #endif // HAVE_MRUBY
221   // Configuration for session affinity
222   AffinityConfig affinity;
223   // Session affinity
224   // true if this group requires that client connection must be TLS,
225   // and the request must be redirected to https URI.
226   bool redirect_if_not_tls;
227   // true if a request should not be forwarded to a backend.
228   bool dnf;
229   // Timeouts for backend connection.
230   struct {
231     ev_tstamp read;
232     ev_tstamp write;
233   } timeout;
234 };
235 
236 struct DownstreamAddrGroup {
237   DownstreamAddrGroup();
238   ~DownstreamAddrGroup();
239 
240   DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
241   DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
242   DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
243   DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
244 
245   ImmutableString pattern;
246   std::shared_ptr<SharedDownstreamAddr> shared_addr;
247   // true if this group is no longer used for new request.  If this is
248   // true, the connection made using one of address in shared_addr
249   // must not be pooled.
250   bool retired;
251 };
252 
253 struct WorkerStat {
254   size_t num_connections;
255   size_t num_close_waits;
256 };
257 
258 #ifdef ENABLE_HTTP3
259 struct QUICPacket {
QUICPacketQUICPacket260   QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
261              const Address &local_addr, const uint8_t *data, size_t datalen)
262       : upstream_addr_index{upstream_addr_index},
263         remote_addr{remote_addr},
264         local_addr{local_addr},
265         data{data, data + datalen} {}
QUICPacketQUICPacket266   QUICPacket() {}
267   size_t upstream_addr_index;
268   Address remote_addr;
269   Address local_addr;
270   std::vector<uint8_t> data;
271 };
272 #endif // ENABLE_HTTP3
273 
274 enum class WorkerEventType {
275   NEW_CONNECTION = 0x01,
276   REOPEN_LOG = 0x02,
277   GRACEFUL_SHUTDOWN = 0x03,
278   REPLACE_DOWNSTREAM = 0x04,
279 #ifdef ENABLE_HTTP3
280   QUIC_PKT_FORWARD = 0x05,
281 #endif // ENABLE_HTTP3
282 };
283 
284 struct WorkerEvent {
285   WorkerEventType type;
286   struct {
287     sockaddr_union client_addr;
288     size_t client_addrlen;
289     int client_fd;
290     const UpstreamAddr *faddr;
291   };
292   std::shared_ptr<TicketKeys> ticket_keys;
293   std::shared_ptr<DownstreamConfig> downstreamconf;
294 #ifdef ENABLE_HTTP3
295   std::unique_ptr<QUICPacket> quic_pkt;
296 #endif // ENABLE_HTTP3
297 };
298 
299 class Worker {
300 public:
301   Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
302          SSL_CTX *tls_session_cache_memcached_ssl_ctx,
303          tls::CertLookupTree *cert_tree,
304 #ifdef ENABLE_HTTP3
305          SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
306          const uint8_t *cid_prefix, size_t cid_prefixlen,
307 #  ifdef HAVE_LIBBPF
308          size_t index,
309 #  endif // HAVE_LIBBPF
310 #endif   // ENABLE_HTTP3
311          const std::shared_ptr<TicketKeys> &ticket_keys,
312          ConnectionHandler *conn_handler,
313          std::shared_ptr<DownstreamConfig> downstreamconf);
314   ~Worker();
315   void run_async();
316   void wait();
317   void process_events();
318   void send(WorkerEvent event);
319 
320   tls::CertLookupTree *get_cert_lookup_tree() const;
321 #ifdef ENABLE_HTTP3
322   tls::CertLookupTree *get_quic_cert_lookup_tree() const;
323 #endif // ENABLE_HTTP3
324 
325   // These 2 functions make a lock m_ to get/set ticket keys
326   // atomically.
327   std::shared_ptr<TicketKeys> get_ticket_keys();
328   void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
329 
330   WorkerStat *get_worker_stat();
331   struct ev_loop *get_loop() const;
332   SSL_CTX *get_sv_ssl_ctx() const;
333   SSL_CTX *get_cl_ssl_ctx() const;
334 #ifdef ENABLE_HTTP3
335   SSL_CTX *get_quic_sv_ssl_ctx() const;
336 #endif // ENABLE_HTTP3
337 
338   void set_graceful_shutdown(bool f);
339   bool get_graceful_shutdown() const;
340 
341   MemchunkPool *get_mcpool();
342   void schedule_clear_mcpool();
343 
344   MemcachedDispatcher *get_session_cache_memcached_dispatcher();
345 
346   std::mt19937 &get_randgen();
347 
348 #ifdef HAVE_MRUBY
349   int create_mruby_context();
350 
351   mruby::MRubyContext *get_mruby_context() const;
352 #endif // HAVE_MRUBY
353 
354   std::vector<std::shared_ptr<DownstreamAddrGroup>> &
355   get_downstream_addr_groups();
356 
357   ConnectBlocker *get_connect_blocker() const;
358 
359   const DownstreamConfig *get_downstream_config() const;
360 
361   void
362   replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
363 
364   ConnectionHandler *get_connection_handler() const;
365 
366 #ifdef ENABLE_HTTP3
367   QUICConnectionHandler *get_quic_connection_handler();
368 
369   int setup_quic_server_socket();
370 
371   const uint8_t *get_cid_prefix() const;
372 
373 #  ifdef HAVE_LIBBPF
374   bool should_attach_bpf() const;
375 
376   bool should_update_bpf_map() const;
377 
378   uint32_t compute_sk_index() const;
379 #  endif // HAVE_LIBBPF
380 
381   int create_quic_server_socket(UpstreamAddr &addr);
382 
383   // Returns a pointer to UpstreamAddr which matches |local_addr|.
384   const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
385 #endif // ENABLE_HTTP3
386 
387   DNSTracker *get_dns_tracker();
388 
389 private:
390 #ifndef NOTHREADS
391   std::future<void> fut_;
392 #endif // NOTHREADS
393 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
394   // Unique index of this worker.
395   size_t index_;
396 #endif // ENABLE_HTTP3 && HAVE_LIBBPF
397   std::mutex m_;
398   std::deque<WorkerEvent> q_;
399   std::mt19937 randgen_;
400   ev_async w_;
401   ev_timer mcpool_clear_timer_;
402   ev_timer proc_wev_timer_;
403   MemchunkPool mcpool_;
404   WorkerStat worker_stat_;
405   DNSTracker dns_tracker_;
406 
407 #ifdef ENABLE_HTTP3
408   std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_;
409   std::vector<UpstreamAddr> quic_upstream_addrs_;
410   std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
411 #endif // ENABLE_HTTP3
412 
413   std::shared_ptr<DownstreamConfig> downstreamconf_;
414   std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
415 #ifdef HAVE_MRUBY
416   std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
417 #endif // HAVE_MRUBY
418   struct ev_loop *loop_;
419 
420   // Following fields are shared across threads if
421   // get_config()->tls_ctx_per_worker == true.
422   SSL_CTX *sv_ssl_ctx_;
423   SSL_CTX *cl_ssl_ctx_;
424   tls::CertLookupTree *cert_tree_;
425   ConnectionHandler *conn_handler_;
426 #ifdef ENABLE_HTTP3
427   SSL_CTX *quic_sv_ssl_ctx_;
428   tls::CertLookupTree *quic_cert_tree_;
429 
430   QUICConnectionHandler quic_conn_handler_;
431 #endif // ENABLE_HTTP3
432 
433 #ifndef HAVE_ATOMIC_STD_SHARED_PTR
434   std::mutex ticket_keys_m_;
435 #endif // !HAVE_ATOMIC_STD_SHARED_PTR
436   std::shared_ptr<TicketKeys> ticket_keys_;
437   std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
438   // Worker level blocker for downstream connection.  For example,
439   // this is used when file decriptor is exhausted.
440   std::unique_ptr<ConnectBlocker> connect_blocker_;
441 
442   bool graceful_shutdown_;
443 };
444 
445 // Selects group based on request's |hostport| and |path|.  |hostport|
446 // is the value taken from :authority or host header field, and may
447 // contain port.  The |path| may contain query part.  We require the
448 // catch-all pattern in place, so this function always selects one
449 // group.  The catch-all group index is given in |catch_all|.  All
450 // patterns are given in |groups|.
451 size_t match_downstream_addr_group(
452     const RouterConfig &routerconfig, const StringRef &hostport,
453     const StringRef &path,
454     const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
455     size_t catch_all, BlockAllocator &balloc);
456 
457 // Calls this function if connecting to backend failed.  |raddr| is
458 // the actual address used to connect to backend, and it could be
459 // nullptr.  This function may schedule live check.
460 void downstream_failure(DownstreamAddr *addr, const Address *raddr);
461 
462 #ifdef ENABLE_HTTP3
463 // Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which
464 // is used as a prefix of QUIC Connection ID.  This function returns
465 // -1 on failure.  |server_id| must be 2 bytes long.
466 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id);
467 #endif // ENABLE_HTTP3
468 
469 } // namespace shrpx
470 
471 #endif // SHRPX_WORKER_H
472