1 /* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 #ifndef SHRPX_WORKER_H 26 #define SHRPX_WORKER_H 27 28 #include "shrpx.h" 29 30 #include <mutex> 31 #include <vector> 32 #include <random> 33 #include <unordered_map> 34 #include <deque> 35 #include <thread> 36 #include <queue> 37 #ifndef NOTHREADS 38 # include <future> 39 #endif // NOTHREADS 40 41 #include <openssl/ssl.h> 42 #include <openssl/err.h> 43 44 #include <ev.h> 45 46 #include "shrpx_config.h" 47 #include "shrpx_downstream_connection_pool.h" 48 #include "memchunk.h" 49 #include "shrpx_tls.h" 50 #include "shrpx_live_check.h" 51 #include "shrpx_connect_blocker.h" 52 #include "shrpx_dns_tracker.h" 53 #ifdef ENABLE_HTTP3 54 # include "shrpx_quic_connection_handler.h" 55 # include "shrpx_quic.h" 56 #endif // ENABLE_HTTP3 57 #include "allocator.h" 58 59 using namespace nghttp2; 60 61 namespace shrpx { 62 63 class Http2Session; 64 class ConnectBlocker; 65 class MemcachedDispatcher; 66 struct UpstreamAddr; 67 class ConnectionHandler; 68 #ifdef ENABLE_HTTP3 69 class QUICListener; 70 #endif // ENABLE_HTTP3 71 72 #ifdef HAVE_MRUBY 73 namespace mruby { 74 75 class MRubyContext; 76 77 } // namespace mruby 78 #endif // HAVE_MRUBY 79 80 namespace tls { 81 class CertLookupTree; 82 } // namespace tls 83 84 struct WeightGroup; 85 86 struct DownstreamAddr { 87 Address addr; 88 // backend address. If |host_unix| is true, this is UNIX domain 89 // socket path. 90 StringRef host; 91 StringRef hostport; 92 // backend port. 0 if |host_unix| is true. 93 uint16_t port; 94 // true if |host| contains UNIX domain socket path. 95 bool host_unix; 96 97 // sni field to send remote server if TLS is enabled. 98 StringRef sni; 99 100 std::unique_ptr<ConnectBlocker> connect_blocker; 101 std::unique_ptr<LiveCheck> live_check; 102 // Connection pool for this particular address if session affinity 103 // is enabled 104 std::unique_ptr<DownstreamConnectionPool> dconn_pool; 105 size_t fall; 106 size_t rise; 107 // Client side TLS session cache 108 tls::TLSSessionCache tls_session_cache; 109 // List of Http2Session which is not fully utilized (i.e., the 110 // server advertised maximum concurrency is not reached). We will 111 // coalesce as much stream as possible in one Http2Session to fully 112 // utilize TCP connection. 113 DList<Http2Session> http2_extra_freelist; 114 WeightGroup *wg; 115 // total number of streams created in HTTP/2 connections for this 116 // address. 117 size_t num_dconn; 118 // the sequence number of this address to randomize the order access 119 // threads. 120 size_t seq; 121 // Application protocol used in this backend 122 Proto proto; 123 // cycle is used to prioritize this address. Lower value takes 124 // higher priority. 125 uint32_t cycle; 126 // penalty which is applied to the next cycle calculation. 127 uint32_t pending_penalty; 128 // Weight of this address inside a weight group. Its range is [1, 129 // 256], inclusive. 130 uint32_t weight; 131 // name of group which this address belongs to. 132 StringRef group; 133 // Weight of the weight group which this address belongs to. Its 134 // range is [1, 256], inclusive. 135 uint32_t group_weight; 136 // true if TLS is used in this backend 137 bool tls; 138 // true if dynamic DNS is enabled 139 bool dns; 140 // true if :scheme pseudo header field should be upgraded to secure 141 // variant (e.g., "https") when forwarding request to a backend 142 // connected by TLS connection. 143 bool upgrade_scheme; 144 // true if this address is queued. 145 bool queued; 146 }; 147 148 constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; 149 150 struct DownstreamAddrEntry { 151 DownstreamAddr *addr; 152 size_t seq; 153 uint32_t cycle; 154 }; 155 156 struct DownstreamAddrEntryGreater { operatorDownstreamAddrEntryGreater157 bool operator()(const DownstreamAddrEntry &lhs, 158 const DownstreamAddrEntry &rhs) const { 159 auto d = lhs.cycle - rhs.cycle; 160 if (d == 0) { 161 return rhs.seq < lhs.seq; 162 } 163 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 164 } 165 }; 166 167 struct WeightGroup { 168 std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>, 169 DownstreamAddrEntryGreater> 170 pq; 171 size_t seq; 172 uint32_t weight; 173 uint32_t cycle; 174 uint32_t pending_penalty; 175 // true if this object is queued. 176 bool queued; 177 }; 178 179 struct WeightGroupEntry { 180 WeightGroup *wg; 181 size_t seq; 182 uint32_t cycle; 183 }; 184 185 struct WeightGroupEntryGreater { operatorWeightGroupEntryGreater186 bool operator()(const WeightGroupEntry &lhs, 187 const WeightGroupEntry &rhs) const { 188 auto d = lhs.cycle - rhs.cycle; 189 if (d == 0) { 190 return rhs.seq < lhs.seq; 191 } 192 return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1; 193 } 194 }; 195 196 struct SharedDownstreamAddr { SharedDownstreamAddrSharedDownstreamAddr197 SharedDownstreamAddr() 198 : balloc(1024, 1024), 199 affinity{SessionAffinity::NONE}, 200 redirect_if_not_tls{false}, 201 dnf{false}, 202 timeout{} {} 203 204 SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; 205 SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; 206 SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; 207 SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; 208 209 BlockAllocator balloc; 210 std::vector<DownstreamAddr> addrs; 211 std::vector<WeightGroup> wgs; 212 std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>, 213 WeightGroupEntryGreater> 214 pq; 215 // Bunch of session affinity hash. Only used if affinity == 216 // SessionAffinity::IP. 217 std::vector<AffinityHash> affinity_hash; 218 #ifdef HAVE_MRUBY 219 std::shared_ptr<mruby::MRubyContext> mruby_ctx; 220 #endif // HAVE_MRUBY 221 // Configuration for session affinity 222 AffinityConfig affinity; 223 // Session affinity 224 // true if this group requires that client connection must be TLS, 225 // and the request must be redirected to https URI. 226 bool redirect_if_not_tls; 227 // true if a request should not be forwarded to a backend. 228 bool dnf; 229 // Timeouts for backend connection. 230 struct { 231 ev_tstamp read; 232 ev_tstamp write; 233 } timeout; 234 }; 235 236 struct DownstreamAddrGroup { 237 DownstreamAddrGroup(); 238 ~DownstreamAddrGroup(); 239 240 DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; 241 DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; 242 DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; 243 DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; 244 245 ImmutableString pattern; 246 std::shared_ptr<SharedDownstreamAddr> shared_addr; 247 // true if this group is no longer used for new request. If this is 248 // true, the connection made using one of address in shared_addr 249 // must not be pooled. 250 bool retired; 251 }; 252 253 struct WorkerStat { 254 size_t num_connections; 255 size_t num_close_waits; 256 }; 257 258 #ifdef ENABLE_HTTP3 259 struct QUICPacket { QUICPacketQUICPacket260 QUICPacket(size_t upstream_addr_index, const Address &remote_addr, 261 const Address &local_addr, const uint8_t *data, size_t datalen) 262 : upstream_addr_index{upstream_addr_index}, 263 remote_addr{remote_addr}, 264 local_addr{local_addr}, 265 data{data, data + datalen} {} QUICPacketQUICPacket266 QUICPacket() {} 267 size_t upstream_addr_index; 268 Address remote_addr; 269 Address local_addr; 270 std::vector<uint8_t> data; 271 }; 272 #endif // ENABLE_HTTP3 273 274 enum class WorkerEventType { 275 NEW_CONNECTION = 0x01, 276 REOPEN_LOG = 0x02, 277 GRACEFUL_SHUTDOWN = 0x03, 278 REPLACE_DOWNSTREAM = 0x04, 279 #ifdef ENABLE_HTTP3 280 QUIC_PKT_FORWARD = 0x05, 281 #endif // ENABLE_HTTP3 282 }; 283 284 struct WorkerEvent { 285 WorkerEventType type; 286 struct { 287 sockaddr_union client_addr; 288 size_t client_addrlen; 289 int client_fd; 290 const UpstreamAddr *faddr; 291 }; 292 std::shared_ptr<TicketKeys> ticket_keys; 293 std::shared_ptr<DownstreamConfig> downstreamconf; 294 #ifdef ENABLE_HTTP3 295 std::unique_ptr<QUICPacket> quic_pkt; 296 #endif // ENABLE_HTTP3 297 }; 298 299 class Worker { 300 public: 301 Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, 302 SSL_CTX *tls_session_cache_memcached_ssl_ctx, 303 tls::CertLookupTree *cert_tree, 304 #ifdef ENABLE_HTTP3 305 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, 306 const uint8_t *cid_prefix, size_t cid_prefixlen, 307 # ifdef HAVE_LIBBPF 308 size_t index, 309 # endif // HAVE_LIBBPF 310 #endif // ENABLE_HTTP3 311 const std::shared_ptr<TicketKeys> &ticket_keys, 312 ConnectionHandler *conn_handler, 313 std::shared_ptr<DownstreamConfig> downstreamconf); 314 ~Worker(); 315 void run_async(); 316 void wait(); 317 void process_events(); 318 void send(WorkerEvent event); 319 320 tls::CertLookupTree *get_cert_lookup_tree() const; 321 #ifdef ENABLE_HTTP3 322 tls::CertLookupTree *get_quic_cert_lookup_tree() const; 323 #endif // ENABLE_HTTP3 324 325 // These 2 functions make a lock m_ to get/set ticket keys 326 // atomically. 327 std::shared_ptr<TicketKeys> get_ticket_keys(); 328 void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys); 329 330 WorkerStat *get_worker_stat(); 331 struct ev_loop *get_loop() const; 332 SSL_CTX *get_sv_ssl_ctx() const; 333 SSL_CTX *get_cl_ssl_ctx() const; 334 #ifdef ENABLE_HTTP3 335 SSL_CTX *get_quic_sv_ssl_ctx() const; 336 #endif // ENABLE_HTTP3 337 338 void set_graceful_shutdown(bool f); 339 bool get_graceful_shutdown() const; 340 341 MemchunkPool *get_mcpool(); 342 void schedule_clear_mcpool(); 343 344 MemcachedDispatcher *get_session_cache_memcached_dispatcher(); 345 346 std::mt19937 &get_randgen(); 347 348 #ifdef HAVE_MRUBY 349 int create_mruby_context(); 350 351 mruby::MRubyContext *get_mruby_context() const; 352 #endif // HAVE_MRUBY 353 354 std::vector<std::shared_ptr<DownstreamAddrGroup>> & 355 get_downstream_addr_groups(); 356 357 ConnectBlocker *get_connect_blocker() const; 358 359 const DownstreamConfig *get_downstream_config() const; 360 361 void 362 replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf); 363 364 ConnectionHandler *get_connection_handler() const; 365 366 #ifdef ENABLE_HTTP3 367 QUICConnectionHandler *get_quic_connection_handler(); 368 369 int setup_quic_server_socket(); 370 371 const uint8_t *get_cid_prefix() const; 372 373 # ifdef HAVE_LIBBPF 374 bool should_attach_bpf() const; 375 376 bool should_update_bpf_map() const; 377 378 uint32_t compute_sk_index() const; 379 # endif // HAVE_LIBBPF 380 381 int create_quic_server_socket(UpstreamAddr &addr); 382 383 // Returns a pointer to UpstreamAddr which matches |local_addr|. 384 const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr); 385 #endif // ENABLE_HTTP3 386 387 DNSTracker *get_dns_tracker(); 388 389 private: 390 #ifndef NOTHREADS 391 std::future<void> fut_; 392 #endif // NOTHREADS 393 #if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) 394 // Unique index of this worker. 395 size_t index_; 396 #endif // ENABLE_HTTP3 && HAVE_LIBBPF 397 std::mutex m_; 398 std::deque<WorkerEvent> q_; 399 std::mt19937 randgen_; 400 ev_async w_; 401 ev_timer mcpool_clear_timer_; 402 ev_timer proc_wev_timer_; 403 MemchunkPool mcpool_; 404 WorkerStat worker_stat_; 405 DNSTracker dns_tracker_; 406 407 #ifdef ENABLE_HTTP3 408 std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_; 409 std::vector<UpstreamAddr> quic_upstream_addrs_; 410 std::vector<std::unique_ptr<QUICListener>> quic_listeners_; 411 #endif // ENABLE_HTTP3 412 413 std::shared_ptr<DownstreamConfig> downstreamconf_; 414 std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_; 415 #ifdef HAVE_MRUBY 416 std::unique_ptr<mruby::MRubyContext> mruby_ctx_; 417 #endif // HAVE_MRUBY 418 struct ev_loop *loop_; 419 420 // Following fields are shared across threads if 421 // get_config()->tls_ctx_per_worker == true. 422 SSL_CTX *sv_ssl_ctx_; 423 SSL_CTX *cl_ssl_ctx_; 424 tls::CertLookupTree *cert_tree_; 425 ConnectionHandler *conn_handler_; 426 #ifdef ENABLE_HTTP3 427 SSL_CTX *quic_sv_ssl_ctx_; 428 tls::CertLookupTree *quic_cert_tree_; 429 430 QUICConnectionHandler quic_conn_handler_; 431 #endif // ENABLE_HTTP3 432 433 #ifndef HAVE_ATOMIC_STD_SHARED_PTR 434 std::mutex ticket_keys_m_; 435 #endif // !HAVE_ATOMIC_STD_SHARED_PTR 436 std::shared_ptr<TicketKeys> ticket_keys_; 437 std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_; 438 // Worker level blocker for downstream connection. For example, 439 // this is used when file decriptor is exhausted. 440 std::unique_ptr<ConnectBlocker> connect_blocker_; 441 442 bool graceful_shutdown_; 443 }; 444 445 // Selects group based on request's |hostport| and |path|. |hostport| 446 // is the value taken from :authority or host header field, and may 447 // contain port. The |path| may contain query part. We require the 448 // catch-all pattern in place, so this function always selects one 449 // group. The catch-all group index is given in |catch_all|. All 450 // patterns are given in |groups|. 451 size_t match_downstream_addr_group( 452 const RouterConfig &routerconfig, const StringRef &hostport, 453 const StringRef &path, 454 const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups, 455 size_t catch_all, BlockAllocator &balloc); 456 457 // Calls this function if connecting to backend failed. |raddr| is 458 // the actual address used to connect to backend, and it could be 459 // nullptr. This function may schedule live check. 460 void downstream_failure(DownstreamAddr *addr, const Address *raddr); 461 462 #ifdef ENABLE_HTTP3 463 // Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which 464 // is used as a prefix of QUIC Connection ID. This function returns 465 // -1 on failure. |server_id| must be 2 bytes long. 466 int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id); 467 #endif // ENABLE_HTTP3 468 469 } // namespace shrpx 470 471 #endif // SHRPX_WORKER_H 472