1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// TODO: turn off the serve goroutine when idle, so 6// an idle conn only has the readFrames goroutine active. (which could 7// also be optimized probably to pin less memory in crypto/tls). This 8// would involve tracking when the serve goroutine is active (atomic 9// int32 read/CAS probably?) and starting it up when frames arrive, 10// and shutting it down when all handlers exit. the occasional PING 11// packets could use time.AfterFunc to call sc.wakeStartServeLoop() 12// (which is a no-op if already running) and then queue the PING write 13// as normal. The serve loop would then exit in most cases (if no 14// Handlers running) and not be woken up again until the PING packet 15// returns. 16 17// TODO (maybe): add a mechanism for Handlers to going into 18// half-closed-local mode (rw.(io.Closer) test?) but not exit their 19// handler, and continue to be able to read from the 20// Request.Body. This would be a somewhat semantic change from HTTP/1 21// (or at least what we expose in net/http), so I'd probably want to 22// add it there too. For now, this package says that returning from 23// the Handler ServeHTTP function means you're both done reading and 24// done writing, without a way to stop just one or the other. 25 26package http2 27 28import ( 29 "bufio" 30 "bytes" 31 "context" 32 "crypto/tls" 33 "errors" 34 "fmt" 35 "io" 36 "log" 37 "math" 38 "net" 39 "net/http" 40 "net/textproto" 41 "net/url" 42 "os" 43 "reflect" 44 "runtime" 45 "strconv" 46 "strings" 47 "sync" 48 "time" 49 50 "golang.org/x/net/http/httpguts" 51 "golang.org/x/net/http2/hpack" 52) 53 54const ( 55 prefaceTimeout = 10 * time.Second 56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway 57 handlerChunkWriteSize = 4 << 10 58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? 59 maxQueuedControlFrames = 10000 60) 61 62var ( 63 errClientDisconnected = errors.New("client disconnected") 64 errClosedBody = errors.New("body closed by handler") 65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting") 66 errStreamClosed = errors.New("http2: stream closed") 67) 68 69var responseWriterStatePool = sync.Pool{ 70 New: func() interface{} { 71 rws := &responseWriterState{} 72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize) 73 return rws 74 }, 75} 76 77// Test hooks. 78var ( 79 testHookOnConn func() 80 testHookGetServerConn func(*serverConn) 81 testHookOnPanicMu *sync.Mutex // nil except in tests 82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool) 83) 84 85// Server is an HTTP/2 server. 86type Server struct { 87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines 88 // which may run at a time over all connections. 89 // Negative or zero no limit. 90 // TODO: implement 91 MaxHandlers int 92 93 // MaxConcurrentStreams optionally specifies the number of 94 // concurrent streams that each client may have open at a 95 // time. This is unrelated to the number of http.Handler goroutines 96 // which may be active globally, which is MaxHandlers. 97 // If zero, MaxConcurrentStreams defaults to at least 100, per 98 // the HTTP/2 spec's recommendations. 99 MaxConcurrentStreams uint32 100 101 // MaxReadFrameSize optionally specifies the largest frame 102 // this server is willing to read. A valid value is between 103 // 16k and 16M, inclusive. If zero or otherwise invalid, a 104 // default value is used. 105 MaxReadFrameSize uint32 106 107 // PermitProhibitedCipherSuites, if true, permits the use of 108 // cipher suites prohibited by the HTTP/2 spec. 109 PermitProhibitedCipherSuites bool 110 111 // IdleTimeout specifies how long until idle clients should be 112 // closed with a GOAWAY frame. PING frames are not considered 113 // activity for the purposes of IdleTimeout. 114 IdleTimeout time.Duration 115 116 // MaxUploadBufferPerConnection is the size of the initial flow 117 // control window for each connections. The HTTP/2 spec does not 118 // allow this to be smaller than 65535 or larger than 2^32-1. 119 // If the value is outside this range, a default value will be 120 // used instead. 121 MaxUploadBufferPerConnection int32 122 123 // MaxUploadBufferPerStream is the size of the initial flow control 124 // window for each stream. The HTTP/2 spec does not allow this to 125 // be larger than 2^32-1. If the value is zero or larger than the 126 // maximum, a default value will be used instead. 127 MaxUploadBufferPerStream int32 128 129 // NewWriteScheduler constructs a write scheduler for a connection. 130 // If nil, a default scheduler is chosen. 131 NewWriteScheduler func() WriteScheduler 132 133 // Internal state. This is a pointer (rather than embedded directly) 134 // so that we don't embed a Mutex in this struct, which will make the 135 // struct non-copyable, which might break some callers. 136 state *serverInternalState 137} 138 139func (s *Server) initialConnRecvWindowSize() int32 { 140 if s.MaxUploadBufferPerConnection > initialWindowSize { 141 return s.MaxUploadBufferPerConnection 142 } 143 return 1 << 20 144} 145 146func (s *Server) initialStreamRecvWindowSize() int32 { 147 if s.MaxUploadBufferPerStream > 0 { 148 return s.MaxUploadBufferPerStream 149 } 150 return 1 << 20 151} 152 153func (s *Server) maxReadFrameSize() uint32 { 154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize { 155 return v 156 } 157 return defaultMaxReadFrameSize 158} 159 160func (s *Server) maxConcurrentStreams() uint32 { 161 if v := s.MaxConcurrentStreams; v > 0 { 162 return v 163 } 164 return defaultMaxStreams 165} 166 167// maxQueuedControlFrames is the maximum number of control frames like 168// SETTINGS, PING and RST_STREAM that will be queued for writing before 169// the connection is closed to prevent memory exhaustion attacks. 170func (s *Server) maxQueuedControlFrames() int { 171 // TODO: if anybody asks, add a Server field, and remember to define the 172 // behavior of negative values. 173 return maxQueuedControlFrames 174} 175 176type serverInternalState struct { 177 mu sync.Mutex 178 activeConns map[*serverConn]struct{} 179} 180 181func (s *serverInternalState) registerConn(sc *serverConn) { 182 if s == nil { 183 return // if the Server was used without calling ConfigureServer 184 } 185 s.mu.Lock() 186 s.activeConns[sc] = struct{}{} 187 s.mu.Unlock() 188} 189 190func (s *serverInternalState) unregisterConn(sc *serverConn) { 191 if s == nil { 192 return // if the Server was used without calling ConfigureServer 193 } 194 s.mu.Lock() 195 delete(s.activeConns, sc) 196 s.mu.Unlock() 197} 198 199func (s *serverInternalState) startGracefulShutdown() { 200 if s == nil { 201 return // if the Server was used without calling ConfigureServer 202 } 203 s.mu.Lock() 204 for sc := range s.activeConns { 205 sc.startGracefulShutdown() 206 } 207 s.mu.Unlock() 208} 209 210// ConfigureServer adds HTTP/2 support to a net/http Server. 211// 212// The configuration conf may be nil. 213// 214// ConfigureServer must be called before s begins serving. 215func ConfigureServer(s *http.Server, conf *Server) error { 216 if s == nil { 217 panic("nil *http.Server") 218 } 219 if conf == nil { 220 conf = new(Server) 221 } 222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})} 223 if h1, h2 := s, conf; h2.IdleTimeout == 0 { 224 if h1.IdleTimeout != 0 { 225 h2.IdleTimeout = h1.IdleTimeout 226 } else { 227 h2.IdleTimeout = h1.ReadTimeout 228 } 229 } 230 s.RegisterOnShutdown(conf.state.startGracefulShutdown) 231 232 if s.TLSConfig == nil { 233 s.TLSConfig = new(tls.Config) 234 } else if s.TLSConfig.CipherSuites != nil { 235 // If they already provided a CipherSuite list, return 236 // an error if it has a bad order or is missing 237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256. 238 haveRequired := false 239 sawBad := false 240 for i, cs := range s.TLSConfig.CipherSuites { 241 switch cs { 242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, 243 // Alternative MTI cipher to not discourage ECDSA-only servers. 244 // See http://golang.org/cl/30721 for further information. 245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: 246 haveRequired = true 247 } 248 if isBadCipher(cs) { 249 sawBad = true 250 } else if sawBad { 251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs) 252 } 253 } 254 if !haveRequired { 255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).") 256 } 257 } 258 259 // Note: not setting MinVersion to tls.VersionTLS12, 260 // as we don't want to interfere with HTTP/1.1 traffic 261 // on the user's server. We enforce TLS 1.2 later once 262 // we accept a connection. Ideally this should be done 263 // during next-proto selection, but using TLS <1.2 with 264 // HTTP/2 is still the client's bug. 265 266 s.TLSConfig.PreferServerCipherSuites = true 267 268 haveNPN := false 269 for _, p := range s.TLSConfig.NextProtos { 270 if p == NextProtoTLS { 271 haveNPN = true 272 break 273 } 274 } 275 if !haveNPN { 276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS) 277 } 278 279 if s.TLSNextProto == nil { 280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){} 281 } 282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) { 283 if testHookOnConn != nil { 284 testHookOnConn() 285 } 286 // The TLSNextProto interface predates contexts, so 287 // the net/http package passes down its per-connection 288 // base context via an exported but unadvertised 289 // method on the Handler. This is for internal 290 // net/http<=>http2 use only. 291 var ctx context.Context 292 type baseContexter interface { 293 BaseContext() context.Context 294 } 295 if bc, ok := h.(baseContexter); ok { 296 ctx = bc.BaseContext() 297 } 298 conf.ServeConn(c, &ServeConnOpts{ 299 Context: ctx, 300 Handler: h, 301 BaseConfig: hs, 302 }) 303 } 304 s.TLSNextProto[NextProtoTLS] = protoHandler 305 return nil 306} 307 308// ServeConnOpts are options for the Server.ServeConn method. 309type ServeConnOpts struct { 310 // Context is the base context to use. 311 // If nil, context.Background is used. 312 Context context.Context 313 314 // BaseConfig optionally sets the base configuration 315 // for values. If nil, defaults are used. 316 BaseConfig *http.Server 317 318 // Handler specifies which handler to use for processing 319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig 320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used. 321 Handler http.Handler 322} 323 324func (o *ServeConnOpts) context() context.Context { 325 if o != nil && o.Context != nil { 326 return o.Context 327 } 328 return context.Background() 329} 330 331func (o *ServeConnOpts) baseConfig() *http.Server { 332 if o != nil && o.BaseConfig != nil { 333 return o.BaseConfig 334 } 335 return new(http.Server) 336} 337 338func (o *ServeConnOpts) handler() http.Handler { 339 if o != nil { 340 if o.Handler != nil { 341 return o.Handler 342 } 343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil { 344 return o.BaseConfig.Handler 345 } 346 } 347 return http.DefaultServeMux 348} 349 350// ServeConn serves HTTP/2 requests on the provided connection and 351// blocks until the connection is no longer readable. 352// 353// ServeConn starts speaking HTTP/2 assuming that c has not had any 354// reads or writes. It writes its initial settings frame and expects 355// to be able to read the preface and settings frame from the 356// client. If c has a ConnectionState method like a *tls.Conn, the 357// ConnectionState is used to verify the TLS ciphersuite and to set 358// the Request.TLS field in Handlers. 359// 360// ServeConn does not support h2c by itself. Any h2c support must be 361// implemented in terms of providing a suitably-behaving net.Conn. 362// 363// The opts parameter is optional. If nil, default values are used. 364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { 365 baseCtx, cancel := serverConnBaseContext(c, opts) 366 defer cancel() 367 368 sc := &serverConn{ 369 srv: s, 370 hs: opts.baseConfig(), 371 conn: c, 372 baseCtx: baseCtx, 373 remoteAddrStr: c.RemoteAddr().String(), 374 bw: newBufferedWriter(c), 375 handler: opts.handler(), 376 streams: make(map[uint32]*stream), 377 readFrameCh: make(chan readFrameResult), 378 wantWriteFrameCh: make(chan FrameWriteRequest, 8), 379 serveMsgCh: make(chan interface{}, 8), 380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync 381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way 382 doneServing: make(chan struct{}), 383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" 384 advMaxStreams: s.maxConcurrentStreams(), 385 initialStreamSendWindowSize: initialWindowSize, 386 maxFrameSize: initialMaxFrameSize, 387 headerTableSize: initialHeaderTableSize, 388 serveG: newGoroutineLock(), 389 pushEnabled: true, 390 } 391 392 s.state.registerConn(sc) 393 defer s.state.unregisterConn(sc) 394 395 // The net/http package sets the write deadline from the 396 // http.Server.WriteTimeout during the TLS handshake, but then 397 // passes the connection off to us with the deadline already set. 398 // Write deadlines are set per stream in serverConn.newStream. 399 // Disarm the net.Conn write deadline here. 400 if sc.hs.WriteTimeout != 0 { 401 sc.conn.SetWriteDeadline(time.Time{}) 402 } 403 404 if s.NewWriteScheduler != nil { 405 sc.writeSched = s.NewWriteScheduler() 406 } else { 407 sc.writeSched = NewRandomWriteScheduler() 408 } 409 410 // These start at the RFC-specified defaults. If there is a higher 411 // configured value for inflow, that will be updated when we send a 412 // WINDOW_UPDATE shortly after sending SETTINGS. 413 sc.flow.add(initialWindowSize) 414 sc.inflow.add(initialWindowSize) 415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) 416 417 fr := NewFramer(sc.bw, c) 418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) 419 fr.MaxHeaderListSize = sc.maxHeaderListSize() 420 fr.SetMaxReadFrameSize(s.maxReadFrameSize()) 421 sc.framer = fr 422 423 if tc, ok := c.(connectionStater); ok { 424 sc.tlsState = new(tls.ConnectionState) 425 *sc.tlsState = tc.ConnectionState() 426 // 9.2 Use of TLS Features 427 // An implementation of HTTP/2 over TLS MUST use TLS 428 // 1.2 or higher with the restrictions on feature set 429 // and cipher suite described in this section. Due to 430 // implementation limitations, it might not be 431 // possible to fail TLS negotiation. An endpoint MUST 432 // immediately terminate an HTTP/2 connection that 433 // does not meet the TLS requirements described in 434 // this section with a connection error (Section 435 // 5.4.1) of type INADEQUATE_SECURITY. 436 if sc.tlsState.Version < tls.VersionTLS12 { 437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low") 438 return 439 } 440 441 if sc.tlsState.ServerName == "" { 442 // Client must use SNI, but we don't enforce that anymore, 443 // since it was causing problems when connecting to bare IP 444 // addresses during development. 445 // 446 // TODO: optionally enforce? Or enforce at the time we receive 447 // a new request, and verify the ServerName matches the :authority? 448 // But that precludes proxy situations, perhaps. 449 // 450 // So for now, do nothing here again. 451 } 452 453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) { 454 // "Endpoints MAY choose to generate a connection error 455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of 456 // the prohibited cipher suites are negotiated." 457 // 458 // We choose that. In my opinion, the spec is weak 459 // here. It also says both parties must support at least 460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no 461 // excuses here. If we really must, we could allow an 462 // "AllowInsecureWeakCiphers" option on the server later. 463 // Let's see how it plays out first. 464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) 465 return 466 } 467 } 468 469 if hook := testHookGetServerConn; hook != nil { 470 hook(sc) 471 } 472 sc.serve() 473} 474 475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) { 476 ctx, cancel = context.WithCancel(opts.context()) 477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr()) 478 if hs := opts.baseConfig(); hs != nil { 479 ctx = context.WithValue(ctx, http.ServerContextKey, hs) 480 } 481 return 482} 483 484func (sc *serverConn) rejectConn(err ErrCode, debug string) { 485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) 486 // ignoring errors. hanging up anyway. 487 sc.framer.WriteGoAway(0, err, []byte(debug)) 488 sc.bw.Flush() 489 sc.conn.Close() 490} 491 492type serverConn struct { 493 // Immutable: 494 srv *Server 495 hs *http.Server 496 conn net.Conn 497 bw *bufferedWriter // writing to conn 498 handler http.Handler 499 baseCtx context.Context 500 framer *Framer 501 doneServing chan struct{} // closed when serverConn.serve ends 502 readFrameCh chan readFrameResult // written by serverConn.readFrames 503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve 504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes 505 bodyReadCh chan bodyReadMsg // from handlers -> serve 506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop 507 flow flow // conn-wide (not stream-specific) outbound flow control 508 inflow flow // conn-wide inbound flow control 509 tlsState *tls.ConnectionState // shared by all handlers, like net/http 510 remoteAddrStr string 511 writeSched WriteScheduler 512 513 // Everything following is owned by the serve loop; use serveG.check(): 514 serveG goroutineLock // used to verify funcs are on serve() 515 pushEnabled bool 516 sawFirstSettings bool // got the initial SETTINGS frame after the preface 517 needToSendSettingsAck bool 518 unackedSettings int // how many SETTINGS have we sent without ACKs? 519 queuedControlFrames int // control frames in the writeSched queue 520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) 521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client 522 curClientStreams uint32 // number of open streams initiated by the client 523 curPushedStreams uint32 // number of open streams initiated by server push 524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests 525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes 526 streams map[uint32]*stream 527 initialStreamSendWindowSize int32 528 maxFrameSize int32 529 headerTableSize uint32 530 peerMaxHeaderListSize uint32 // zero means unknown (default) 531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case 532 writingFrame bool // started writing a frame (on serve goroutine or separate) 533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh 534 needsFrameFlush bool // last frame write wasn't a flush 535 inGoAway bool // we've started to or sent GOAWAY 536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop 537 needToSendGoAway bool // we need to schedule a GOAWAY frame write 538 goAwayCode ErrCode 539 shutdownTimer *time.Timer // nil until used 540 idleTimer *time.Timer // nil if unused 541 542 // Owned by the writeFrameAsync goroutine: 543 headerWriteBuf bytes.Buffer 544 hpackEncoder *hpack.Encoder 545 546 // Used by startGracefulShutdown. 547 shutdownOnce sync.Once 548} 549 550func (sc *serverConn) maxHeaderListSize() uint32 { 551 n := sc.hs.MaxHeaderBytes 552 if n <= 0 { 553 n = http.DefaultMaxHeaderBytes 554 } 555 // http2's count is in a slightly different unit and includes 32 bytes per pair. 556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. 557 const perFieldOverhead = 32 // per http2 spec 558 const typicalHeaders = 10 // conservative 559 return uint32(n + typicalHeaders*perFieldOverhead) 560} 561 562func (sc *serverConn) curOpenStreams() uint32 { 563 sc.serveG.check() 564 return sc.curClientStreams + sc.curPushedStreams 565} 566 567// stream represents a stream. This is the minimal metadata needed by 568// the serve goroutine. Most of the actual stream state is owned by 569// the http.Handler's goroutine in the responseWriter. Because the 570// responseWriter's responseWriterState is recycled at the end of a 571// handler, this struct intentionally has no pointer to the 572// *responseWriter{,State} itself, as the Handler ending nils out the 573// responseWriter's state field. 574type stream struct { 575 // immutable: 576 sc *serverConn 577 id uint32 578 body *pipe // non-nil if expecting DATA frames 579 cw closeWaiter // closed wait stream transitions to closed state 580 ctx context.Context 581 cancelCtx func() 582 583 // owned by serverConn's serve loop: 584 bodyBytes int64 // body bytes seen so far 585 declBodyBytes int64 // or -1 if undeclared 586 flow flow // limits writing from Handler to client 587 inflow flow // what the client is allowed to POST/etc to us 588 state streamState 589 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream 590 gotTrailerHeader bool // HEADER frame for trailers was seen 591 wroteHeaders bool // whether we wrote headers (not status 100) 592 writeDeadline *time.Timer // nil if unused 593 594 trailer http.Header // accumulated trailers 595 reqTrailer http.Header // handler's Request.Trailer 596} 597 598func (sc *serverConn) Framer() *Framer { return sc.framer } 599func (sc *serverConn) CloseConn() error { return sc.conn.Close() } 600func (sc *serverConn) Flush() error { return sc.bw.Flush() } 601func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { 602 return sc.hpackEncoder, &sc.headerWriteBuf 603} 604 605func (sc *serverConn) state(streamID uint32) (streamState, *stream) { 606 sc.serveG.check() 607 // http://tools.ietf.org/html/rfc7540#section-5.1 608 if st, ok := sc.streams[streamID]; ok { 609 return st.state, st 610 } 611 // "The first use of a new stream identifier implicitly closes all 612 // streams in the "idle" state that might have been initiated by 613 // that peer with a lower-valued stream identifier. For example, if 614 // a client sends a HEADERS frame on stream 7 without ever sending a 615 // frame on stream 5, then stream 5 transitions to the "closed" 616 // state when the first frame for stream 7 is sent or received." 617 if streamID%2 == 1 { 618 if streamID <= sc.maxClientStreamID { 619 return stateClosed, nil 620 } 621 } else { 622 if streamID <= sc.maxPushPromiseID { 623 return stateClosed, nil 624 } 625 } 626 return stateIdle, nil 627} 628 629// setConnState calls the net/http ConnState hook for this connection, if configured. 630// Note that the net/http package does StateNew and StateClosed for us. 631// There is currently no plan for StateHijacked or hijacking HTTP/2 connections. 632func (sc *serverConn) setConnState(state http.ConnState) { 633 if sc.hs.ConnState != nil { 634 sc.hs.ConnState(sc.conn, state) 635 } 636} 637 638func (sc *serverConn) vlogf(format string, args ...interface{}) { 639 if VerboseLogs { 640 sc.logf(format, args...) 641 } 642} 643 644func (sc *serverConn) logf(format string, args ...interface{}) { 645 if lg := sc.hs.ErrorLog; lg != nil { 646 lg.Printf(format, args...) 647 } else { 648 log.Printf(format, args...) 649 } 650} 651 652// errno returns v's underlying uintptr, else 0. 653// 654// TODO: remove this helper function once http2 can use build 655// tags. See comment in isClosedConnError. 656func errno(v error) uintptr { 657 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr { 658 return uintptr(rv.Uint()) 659 } 660 return 0 661} 662 663// isClosedConnError reports whether err is an error from use of a closed 664// network connection. 665func isClosedConnError(err error) bool { 666 if err == nil { 667 return false 668 } 669 670 // TODO: remove this string search and be more like the Windows 671 // case below. That might involve modifying the standard library 672 // to return better error types. 673 str := err.Error() 674 if strings.Contains(str, "use of closed network connection") { 675 return true 676 } 677 678 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support 679 // build tags, so I can't make an http2_windows.go file with 680 // Windows-specific stuff. Fix that and move this, once we 681 // have a way to bundle this into std's net/http somehow. 682 if runtime.GOOS == "windows" { 683 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { 684 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { 685 const WSAECONNABORTED = 10053 686 const WSAECONNRESET = 10054 687 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED { 688 return true 689 } 690 } 691 } 692 } 693 return false 694} 695 696func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { 697 if err == nil { 698 return 699 } 700 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout { 701 // Boring, expected errors. 702 sc.vlogf(format, args...) 703 } else { 704 sc.logf(format, args...) 705 } 706} 707 708func (sc *serverConn) canonicalHeader(v string) string { 709 sc.serveG.check() 710 buildCommonHeaderMapsOnce() 711 cv, ok := commonCanonHeader[v] 712 if ok { 713 return cv 714 } 715 cv, ok = sc.canonHeader[v] 716 if ok { 717 return cv 718 } 719 if sc.canonHeader == nil { 720 sc.canonHeader = make(map[string]string) 721 } 722 cv = http.CanonicalHeaderKey(v) 723 sc.canonHeader[v] = cv 724 return cv 725} 726 727type readFrameResult struct { 728 f Frame // valid until readMore is called 729 err error 730 731 // readMore should be called once the consumer no longer needs or 732 // retains f. After readMore, f is invalid and more frames can be 733 // read. 734 readMore func() 735} 736 737// readFrames is the loop that reads incoming frames. 738// It takes care to only read one frame at a time, blocking until the 739// consumer is done with the frame. 740// It's run on its own goroutine. 741func (sc *serverConn) readFrames() { 742 gate := make(gate) 743 gateDone := gate.Done 744 for { 745 f, err := sc.framer.ReadFrame() 746 select { 747 case sc.readFrameCh <- readFrameResult{f, err, gateDone}: 748 case <-sc.doneServing: 749 return 750 } 751 select { 752 case <-gate: 753 case <-sc.doneServing: 754 return 755 } 756 if terminalReadFrameError(err) { 757 return 758 } 759 } 760} 761 762// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine. 763type frameWriteResult struct { 764 _ incomparable 765 wr FrameWriteRequest // what was written (or attempted) 766 err error // result of the writeFrame call 767} 768 769// writeFrameAsync runs in its own goroutine and writes a single frame 770// and then reports when it's done. 771// At most one goroutine can be running writeFrameAsync at a time per 772// serverConn. 773func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) { 774 err := wr.write.writeFrame(sc) 775 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err} 776} 777 778func (sc *serverConn) closeAllStreamsOnConnClose() { 779 sc.serveG.check() 780 for _, st := range sc.streams { 781 sc.closeStream(st, errClientDisconnected) 782 } 783} 784 785func (sc *serverConn) stopShutdownTimer() { 786 sc.serveG.check() 787 if t := sc.shutdownTimer; t != nil { 788 t.Stop() 789 } 790} 791 792func (sc *serverConn) notePanic() { 793 // Note: this is for serverConn.serve panicking, not http.Handler code. 794 if testHookOnPanicMu != nil { 795 testHookOnPanicMu.Lock() 796 defer testHookOnPanicMu.Unlock() 797 } 798 if testHookOnPanic != nil { 799 if e := recover(); e != nil { 800 if testHookOnPanic(sc, e) { 801 panic(e) 802 } 803 } 804 } 805} 806 807func (sc *serverConn) serve() { 808 sc.serveG.check() 809 defer sc.notePanic() 810 defer sc.conn.Close() 811 defer sc.closeAllStreamsOnConnClose() 812 defer sc.stopShutdownTimer() 813 defer close(sc.doneServing) // unblocks handlers trying to send 814 815 if VerboseLogs { 816 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) 817 } 818 819 sc.writeFrame(FrameWriteRequest{ 820 write: writeSettings{ 821 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, 822 {SettingMaxConcurrentStreams, sc.advMaxStreams}, 823 {SettingMaxHeaderListSize, sc.maxHeaderListSize()}, 824 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, 825 }, 826 }) 827 sc.unackedSettings++ 828 829 // Each connection starts with intialWindowSize inflow tokens. 830 // If a higher value is configured, we add more tokens. 831 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { 832 sc.sendWindowUpdate(nil, int(diff)) 833 } 834 835 if err := sc.readPreface(); err != nil { 836 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) 837 return 838 } 839 // Now that we've got the preface, get us out of the 840 // "StateNew" state. We can't go directly to idle, though. 841 // Active means we read some data and anticipate a request. We'll 842 // do another Active when we get a HEADERS frame. 843 sc.setConnState(http.StateActive) 844 sc.setConnState(http.StateIdle) 845 846 if sc.srv.IdleTimeout != 0 { 847 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) 848 defer sc.idleTimer.Stop() 849 } 850 851 go sc.readFrames() // closed by defer sc.conn.Close above 852 853 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) 854 defer settingsTimer.Stop() 855 856 loopNum := 0 857 for { 858 loopNum++ 859 select { 860 case wr := <-sc.wantWriteFrameCh: 861 if se, ok := wr.write.(StreamError); ok { 862 sc.resetStream(se) 863 break 864 } 865 sc.writeFrame(wr) 866 case res := <-sc.wroteFrameCh: 867 sc.wroteFrame(res) 868 case res := <-sc.readFrameCh: 869 if !sc.processFrameFromReader(res) { 870 return 871 } 872 res.readMore() 873 if settingsTimer != nil { 874 settingsTimer.Stop() 875 settingsTimer = nil 876 } 877 case m := <-sc.bodyReadCh: 878 sc.noteBodyRead(m.st, m.n) 879 case msg := <-sc.serveMsgCh: 880 switch v := msg.(type) { 881 case func(int): 882 v(loopNum) // for testing 883 case *serverMessage: 884 switch v { 885 case settingsTimerMsg: 886 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) 887 return 888 case idleTimerMsg: 889 sc.vlogf("connection is idle") 890 sc.goAway(ErrCodeNo) 891 case shutdownTimerMsg: 892 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) 893 return 894 case gracefulShutdownMsg: 895 sc.startGracefulShutdownInternal() 896 default: 897 panic("unknown timer") 898 } 899 case *startPushRequest: 900 sc.startPush(v) 901 default: 902 panic(fmt.Sprintf("unexpected type %T", v)) 903 } 904 } 905 906 // If the peer is causing us to generate a lot of control frames, 907 // but not reading them from us, assume they are trying to make us 908 // run out of memory. 909 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() { 910 sc.vlogf("http2: too many control frames in send queue, closing connection") 911 return 912 } 913 914 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY 915 // with no error code (graceful shutdown), don't start the timer until 916 // all open streams have been completed. 917 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame 918 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0 919 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) { 920 sc.shutDownIn(goAwayTimeout) 921 } 922 } 923} 924 925func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { 926 select { 927 case <-sc.doneServing: 928 case <-sharedCh: 929 close(privateCh) 930 } 931} 932 933type serverMessage int 934 935// Message values sent to serveMsgCh. 936var ( 937 settingsTimerMsg = new(serverMessage) 938 idleTimerMsg = new(serverMessage) 939 shutdownTimerMsg = new(serverMessage) 940 gracefulShutdownMsg = new(serverMessage) 941) 942 943func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } 944func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) } 945func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) } 946 947func (sc *serverConn) sendServeMsg(msg interface{}) { 948 sc.serveG.checkNotOn() // NOT 949 select { 950 case sc.serveMsgCh <- msg: 951 case <-sc.doneServing: 952 } 953} 954 955var errPrefaceTimeout = errors.New("timeout waiting for client preface") 956 957// readPreface reads the ClientPreface greeting from the peer or 958// returns errPrefaceTimeout on timeout, or an error if the greeting 959// is invalid. 960func (sc *serverConn) readPreface() error { 961 errc := make(chan error, 1) 962 go func() { 963 // Read the client preface 964 buf := make([]byte, len(ClientPreface)) 965 if _, err := io.ReadFull(sc.conn, buf); err != nil { 966 errc <- err 967 } else if !bytes.Equal(buf, clientPreface) { 968 errc <- fmt.Errorf("bogus greeting %q", buf) 969 } else { 970 errc <- nil 971 } 972 }() 973 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server? 974 defer timer.Stop() 975 select { 976 case <-timer.C: 977 return errPrefaceTimeout 978 case err := <-errc: 979 if err == nil { 980 if VerboseLogs { 981 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr()) 982 } 983 } 984 return err 985 } 986} 987 988var errChanPool = sync.Pool{ 989 New: func() interface{} { return make(chan error, 1) }, 990} 991 992var writeDataPool = sync.Pool{ 993 New: func() interface{} { return new(writeData) }, 994} 995 996// writeDataFromHandler writes DATA response frames from a handler on 997// the given stream. 998func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error { 999 ch := errChanPool.Get().(chan error) 1000 writeArg := writeDataPool.Get().(*writeData) 1001 *writeArg = writeData{stream.id, data, endStream} 1002 err := sc.writeFrameFromHandler(FrameWriteRequest{ 1003 write: writeArg, 1004 stream: stream, 1005 done: ch, 1006 }) 1007 if err != nil { 1008 return err 1009 } 1010 var frameWriteDone bool // the frame write is done (successfully or not) 1011 select { 1012 case err = <-ch: 1013 frameWriteDone = true 1014 case <-sc.doneServing: 1015 return errClientDisconnected 1016 case <-stream.cw: 1017 // If both ch and stream.cw were ready (as might 1018 // happen on the final Write after an http.Handler 1019 // ends), prefer the write result. Otherwise this 1020 // might just be us successfully closing the stream. 1021 // The writeFrameAsync and serve goroutines guarantee 1022 // that the ch send will happen before the stream.cw 1023 // close. 1024 select { 1025 case err = <-ch: 1026 frameWriteDone = true 1027 default: 1028 return errStreamClosed 1029 } 1030 } 1031 errChanPool.Put(ch) 1032 if frameWriteDone { 1033 writeDataPool.Put(writeArg) 1034 } 1035 return err 1036} 1037 1038// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts 1039// if the connection has gone away. 1040// 1041// This must not be run from the serve goroutine itself, else it might 1042// deadlock writing to sc.wantWriteFrameCh (which is only mildly 1043// buffered and is read by serve itself). If you're on the serve 1044// goroutine, call writeFrame instead. 1045func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error { 1046 sc.serveG.checkNotOn() // NOT 1047 select { 1048 case sc.wantWriteFrameCh <- wr: 1049 return nil 1050 case <-sc.doneServing: 1051 // Serve loop is gone. 1052 // Client has closed their connection to the server. 1053 return errClientDisconnected 1054 } 1055} 1056 1057// writeFrame schedules a frame to write and sends it if there's nothing 1058// already being written. 1059// 1060// There is no pushback here (the serve goroutine never blocks). It's 1061// the http.Handlers that block, waiting for their previous frames to 1062// make it onto the wire 1063// 1064// If you're not on the serve goroutine, use writeFrameFromHandler instead. 1065func (sc *serverConn) writeFrame(wr FrameWriteRequest) { 1066 sc.serveG.check() 1067 1068 // If true, wr will not be written and wr.done will not be signaled. 1069 var ignoreWrite bool 1070 1071 // We are not allowed to write frames on closed streams. RFC 7540 Section 1072 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on 1073 // a closed stream." Our server never sends PRIORITY, so that exception 1074 // does not apply. 1075 // 1076 // The serverConn might close an open stream while the stream's handler 1077 // is still running. For example, the server might close a stream when it 1078 // receives bad data from the client. If this happens, the handler might 1079 // attempt to write a frame after the stream has been closed (since the 1080 // handler hasn't yet been notified of the close). In this case, we simply 1081 // ignore the frame. The handler will notice that the stream is closed when 1082 // it waits for the frame to be written. 1083 // 1084 // As an exception to this rule, we allow sending RST_STREAM after close. 1085 // This allows us to immediately reject new streams without tracking any 1086 // state for those streams (except for the queued RST_STREAM frame). This 1087 // may result in duplicate RST_STREAMs in some cases, but the client should 1088 // ignore those. 1089 if wr.StreamID() != 0 { 1090 _, isReset := wr.write.(StreamError) 1091 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset { 1092 ignoreWrite = true 1093 } 1094 } 1095 1096 // Don't send a 100-continue response if we've already sent headers. 1097 // See golang.org/issue/14030. 1098 switch wr.write.(type) { 1099 case *writeResHeaders: 1100 wr.stream.wroteHeaders = true 1101 case write100ContinueHeadersFrame: 1102 if wr.stream.wroteHeaders { 1103 // We do not need to notify wr.done because this frame is 1104 // never written with wr.done != nil. 1105 if wr.done != nil { 1106 panic("wr.done != nil for write100ContinueHeadersFrame") 1107 } 1108 ignoreWrite = true 1109 } 1110 } 1111 1112 if !ignoreWrite { 1113 if wr.isControl() { 1114 sc.queuedControlFrames++ 1115 // For extra safety, detect wraparounds, which should not happen, 1116 // and pull the plug. 1117 if sc.queuedControlFrames < 0 { 1118 sc.conn.Close() 1119 } 1120 } 1121 sc.writeSched.Push(wr) 1122 } 1123 sc.scheduleFrameWrite() 1124} 1125 1126// startFrameWrite starts a goroutine to write wr (in a separate 1127// goroutine since that might block on the network), and updates the 1128// serve goroutine's state about the world, updated from info in wr. 1129func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { 1130 sc.serveG.check() 1131 if sc.writingFrame { 1132 panic("internal error: can only be writing one frame at a time") 1133 } 1134 1135 st := wr.stream 1136 if st != nil { 1137 switch st.state { 1138 case stateHalfClosedLocal: 1139 switch wr.write.(type) { 1140 case StreamError, handlerPanicRST, writeWindowUpdate: 1141 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE 1142 // in this state. (We never send PRIORITY from the server, so that is not checked.) 1143 default: 1144 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) 1145 } 1146 case stateClosed: 1147 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) 1148 } 1149 } 1150 if wpp, ok := wr.write.(*writePushPromise); ok { 1151 var err error 1152 wpp.promisedID, err = wpp.allocatePromisedID() 1153 if err != nil { 1154 sc.writingFrameAsync = false 1155 wr.replyToWriter(err) 1156 return 1157 } 1158 } 1159 1160 sc.writingFrame = true 1161 sc.needsFrameFlush = true 1162 if wr.write.staysWithinBuffer(sc.bw.Available()) { 1163 sc.writingFrameAsync = false 1164 err := wr.write.writeFrame(sc) 1165 sc.wroteFrame(frameWriteResult{wr: wr, err: err}) 1166 } else { 1167 sc.writingFrameAsync = true 1168 go sc.writeFrameAsync(wr) 1169 } 1170} 1171 1172// errHandlerPanicked is the error given to any callers blocked in a read from 1173// Request.Body when the main goroutine panics. Since most handlers read in the 1174// main ServeHTTP goroutine, this will show up rarely. 1175var errHandlerPanicked = errors.New("http2: handler panicked") 1176 1177// wroteFrame is called on the serve goroutine with the result of 1178// whatever happened on writeFrameAsync. 1179func (sc *serverConn) wroteFrame(res frameWriteResult) { 1180 sc.serveG.check() 1181 if !sc.writingFrame { 1182 panic("internal error: expected to be already writing a frame") 1183 } 1184 sc.writingFrame = false 1185 sc.writingFrameAsync = false 1186 1187 wr := res.wr 1188 1189 if writeEndsStream(wr.write) { 1190 st := wr.stream 1191 if st == nil { 1192 panic("internal error: expecting non-nil stream") 1193 } 1194 switch st.state { 1195 case stateOpen: 1196 // Here we would go to stateHalfClosedLocal in 1197 // theory, but since our handler is done and 1198 // the net/http package provides no mechanism 1199 // for closing a ResponseWriter while still 1200 // reading data (see possible TODO at top of 1201 // this file), we go into closed state here 1202 // anyway, after telling the peer we're 1203 // hanging up on them. We'll transition to 1204 // stateClosed after the RST_STREAM frame is 1205 // written. 1206 st.state = stateHalfClosedLocal 1207 // Section 8.1: a server MAY request that the client abort 1208 // transmission of a request without error by sending a 1209 // RST_STREAM with an error code of NO_ERROR after sending 1210 // a complete response. 1211 sc.resetStream(streamError(st.id, ErrCodeNo)) 1212 case stateHalfClosedRemote: 1213 sc.closeStream(st, errHandlerComplete) 1214 } 1215 } else { 1216 switch v := wr.write.(type) { 1217 case StreamError: 1218 // st may be unknown if the RST_STREAM was generated to reject bad input. 1219 if st, ok := sc.streams[v.StreamID]; ok { 1220 sc.closeStream(st, v) 1221 } 1222 case handlerPanicRST: 1223 sc.closeStream(wr.stream, errHandlerPanicked) 1224 } 1225 } 1226 1227 // Reply (if requested) to unblock the ServeHTTP goroutine. 1228 wr.replyToWriter(res.err) 1229 1230 sc.scheduleFrameWrite() 1231} 1232 1233// scheduleFrameWrite tickles the frame writing scheduler. 1234// 1235// If a frame is already being written, nothing happens. This will be called again 1236// when the frame is done being written. 1237// 1238// If a frame isn't being written and we need to send one, the best frame 1239// to send is selected by writeSched. 1240// 1241// If a frame isn't being written and there's nothing else to send, we 1242// flush the write buffer. 1243func (sc *serverConn) scheduleFrameWrite() { 1244 sc.serveG.check() 1245 if sc.writingFrame || sc.inFrameScheduleLoop { 1246 return 1247 } 1248 sc.inFrameScheduleLoop = true 1249 for !sc.writingFrameAsync { 1250 if sc.needToSendGoAway { 1251 sc.needToSendGoAway = false 1252 sc.startFrameWrite(FrameWriteRequest{ 1253 write: &writeGoAway{ 1254 maxStreamID: sc.maxClientStreamID, 1255 code: sc.goAwayCode, 1256 }, 1257 }) 1258 continue 1259 } 1260 if sc.needToSendSettingsAck { 1261 sc.needToSendSettingsAck = false 1262 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}}) 1263 continue 1264 } 1265 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo { 1266 if wr, ok := sc.writeSched.Pop(); ok { 1267 if wr.isControl() { 1268 sc.queuedControlFrames-- 1269 } 1270 sc.startFrameWrite(wr) 1271 continue 1272 } 1273 } 1274 if sc.needsFrameFlush { 1275 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}}) 1276 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true 1277 continue 1278 } 1279 break 1280 } 1281 sc.inFrameScheduleLoop = false 1282} 1283 1284// startGracefulShutdown gracefully shuts down a connection. This 1285// sends GOAWAY with ErrCodeNo to tell the client we're gracefully 1286// shutting down. The connection isn't closed until all current 1287// streams are done. 1288// 1289// startGracefulShutdown returns immediately; it does not wait until 1290// the connection has shut down. 1291func (sc *serverConn) startGracefulShutdown() { 1292 sc.serveG.checkNotOn() // NOT 1293 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) }) 1294} 1295 1296// After sending GOAWAY, the connection will close after goAwayTimeout. 1297// If we close the connection immediately after sending GOAWAY, there may 1298// be unsent data in our kernel receive buffer, which will cause the kernel 1299// to send a TCP RST on close() instead of a FIN. This RST will abort the 1300// connection immediately, whether or not the client had received the GOAWAY. 1301// 1302// Ideally we should delay for at least 1 RTT + epsilon so the client has 1303// a chance to read the GOAWAY and stop sending messages. Measuring RTT 1304// is hard, so we approximate with 1 second. See golang.org/issue/18701. 1305// 1306// This is a var so it can be shorter in tests, where all requests uses the 1307// loopback interface making the expected RTT very small. 1308// 1309// TODO: configurable? 1310var goAwayTimeout = 1 * time.Second 1311 1312func (sc *serverConn) startGracefulShutdownInternal() { 1313 sc.goAway(ErrCodeNo) 1314} 1315 1316func (sc *serverConn) goAway(code ErrCode) { 1317 sc.serveG.check() 1318 if sc.inGoAway { 1319 return 1320 } 1321 sc.inGoAway = true 1322 sc.needToSendGoAway = true 1323 sc.goAwayCode = code 1324 sc.scheduleFrameWrite() 1325} 1326 1327func (sc *serverConn) shutDownIn(d time.Duration) { 1328 sc.serveG.check() 1329 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) 1330} 1331 1332func (sc *serverConn) resetStream(se StreamError) { 1333 sc.serveG.check() 1334 sc.writeFrame(FrameWriteRequest{write: se}) 1335 if st, ok := sc.streams[se.StreamID]; ok { 1336 st.resetQueued = true 1337 } 1338} 1339 1340// processFrameFromReader processes the serve loop's read from readFrameCh from the 1341// frame-reading goroutine. 1342// processFrameFromReader returns whether the connection should be kept open. 1343func (sc *serverConn) processFrameFromReader(res readFrameResult) bool { 1344 sc.serveG.check() 1345 err := res.err 1346 if err != nil { 1347 if err == ErrFrameTooLarge { 1348 sc.goAway(ErrCodeFrameSize) 1349 return true // goAway will close the loop 1350 } 1351 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) 1352 if clientGone { 1353 // TODO: could we also get into this state if 1354 // the peer does a half close 1355 // (e.g. CloseWrite) because they're done 1356 // sending frames but they're still wanting 1357 // our open replies? Investigate. 1358 // TODO: add CloseWrite to crypto/tls.Conn first 1359 // so we have a way to test this? I suppose 1360 // just for testing we could have a non-TLS mode. 1361 return false 1362 } 1363 } else { 1364 f := res.f 1365 if VerboseLogs { 1366 sc.vlogf("http2: server read frame %v", summarizeFrame(f)) 1367 } 1368 err = sc.processFrame(f) 1369 if err == nil { 1370 return true 1371 } 1372 } 1373 1374 switch ev := err.(type) { 1375 case StreamError: 1376 sc.resetStream(ev) 1377 return true 1378 case goAwayFlowError: 1379 sc.goAway(ErrCodeFlowControl) 1380 return true 1381 case ConnectionError: 1382 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) 1383 sc.goAway(ErrCode(ev)) 1384 return true // goAway will handle shutdown 1385 default: 1386 if res.err != nil { 1387 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) 1388 } else { 1389 sc.logf("http2: server closing client connection: %v", err) 1390 } 1391 return false 1392 } 1393} 1394 1395func (sc *serverConn) processFrame(f Frame) error { 1396 sc.serveG.check() 1397 1398 // First frame received must be SETTINGS. 1399 if !sc.sawFirstSettings { 1400 if _, ok := f.(*SettingsFrame); !ok { 1401 return ConnectionError(ErrCodeProtocol) 1402 } 1403 sc.sawFirstSettings = true 1404 } 1405 1406 switch f := f.(type) { 1407 case *SettingsFrame: 1408 return sc.processSettings(f) 1409 case *MetaHeadersFrame: 1410 return sc.processHeaders(f) 1411 case *WindowUpdateFrame: 1412 return sc.processWindowUpdate(f) 1413 case *PingFrame: 1414 return sc.processPing(f) 1415 case *DataFrame: 1416 return sc.processData(f) 1417 case *RSTStreamFrame: 1418 return sc.processResetStream(f) 1419 case *PriorityFrame: 1420 return sc.processPriority(f) 1421 case *GoAwayFrame: 1422 return sc.processGoAway(f) 1423 case *PushPromiseFrame: 1424 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE 1425 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. 1426 return ConnectionError(ErrCodeProtocol) 1427 default: 1428 sc.vlogf("http2: server ignoring frame: %v", f.Header()) 1429 return nil 1430 } 1431} 1432 1433func (sc *serverConn) processPing(f *PingFrame) error { 1434 sc.serveG.check() 1435 if f.IsAck() { 1436 // 6.7 PING: " An endpoint MUST NOT respond to PING frames 1437 // containing this flag." 1438 return nil 1439 } 1440 if f.StreamID != 0 { 1441 // "PING frames are not associated with any individual 1442 // stream. If a PING frame is received with a stream 1443 // identifier field value other than 0x0, the recipient MUST 1444 // respond with a connection error (Section 5.4.1) of type 1445 // PROTOCOL_ERROR." 1446 return ConnectionError(ErrCodeProtocol) 1447 } 1448 if sc.inGoAway && sc.goAwayCode != ErrCodeNo { 1449 return nil 1450 } 1451 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}}) 1452 return nil 1453} 1454 1455func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error { 1456 sc.serveG.check() 1457 switch { 1458 case f.StreamID != 0: // stream-level flow control 1459 state, st := sc.state(f.StreamID) 1460 if state == stateIdle { 1461 // Section 5.1: "Receiving any frame other than HEADERS 1462 // or PRIORITY on a stream in this state MUST be 1463 // treated as a connection error (Section 5.4.1) of 1464 // type PROTOCOL_ERROR." 1465 return ConnectionError(ErrCodeProtocol) 1466 } 1467 if st == nil { 1468 // "WINDOW_UPDATE can be sent by a peer that has sent a 1469 // frame bearing the END_STREAM flag. This means that a 1470 // receiver could receive a WINDOW_UPDATE frame on a "half 1471 // closed (remote)" or "closed" stream. A receiver MUST 1472 // NOT treat this as an error, see Section 5.1." 1473 return nil 1474 } 1475 if !st.flow.add(int32(f.Increment)) { 1476 return streamError(f.StreamID, ErrCodeFlowControl) 1477 } 1478 default: // connection-level flow control 1479 if !sc.flow.add(int32(f.Increment)) { 1480 return goAwayFlowError{} 1481 } 1482 } 1483 sc.scheduleFrameWrite() 1484 return nil 1485} 1486 1487func (sc *serverConn) processResetStream(f *RSTStreamFrame) error { 1488 sc.serveG.check() 1489 1490 state, st := sc.state(f.StreamID) 1491 if state == stateIdle { 1492 // 6.4 "RST_STREAM frames MUST NOT be sent for a 1493 // stream in the "idle" state. If a RST_STREAM frame 1494 // identifying an idle stream is received, the 1495 // recipient MUST treat this as a connection error 1496 // (Section 5.4.1) of type PROTOCOL_ERROR. 1497 return ConnectionError(ErrCodeProtocol) 1498 } 1499 if st != nil { 1500 st.cancelCtx() 1501 sc.closeStream(st, streamError(f.StreamID, f.ErrCode)) 1502 } 1503 return nil 1504} 1505 1506func (sc *serverConn) closeStream(st *stream, err error) { 1507 sc.serveG.check() 1508 if st.state == stateIdle || st.state == stateClosed { 1509 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) 1510 } 1511 st.state = stateClosed 1512 if st.writeDeadline != nil { 1513 st.writeDeadline.Stop() 1514 } 1515 if st.isPushed() { 1516 sc.curPushedStreams-- 1517 } else { 1518 sc.curClientStreams-- 1519 } 1520 delete(sc.streams, st.id) 1521 if len(sc.streams) == 0 { 1522 sc.setConnState(http.StateIdle) 1523 if sc.srv.IdleTimeout != 0 { 1524 sc.idleTimer.Reset(sc.srv.IdleTimeout) 1525 } 1526 if h1ServerKeepAlivesDisabled(sc.hs) { 1527 sc.startGracefulShutdownInternal() 1528 } 1529 } 1530 if p := st.body; p != nil { 1531 // Return any buffered unread bytes worth of conn-level flow control. 1532 // See golang.org/issue/16481 1533 sc.sendWindowUpdate(nil, p.Len()) 1534 1535 p.CloseWithError(err) 1536 } 1537 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc 1538 sc.writeSched.CloseStream(st.id) 1539} 1540 1541func (sc *serverConn) processSettings(f *SettingsFrame) error { 1542 sc.serveG.check() 1543 if f.IsAck() { 1544 sc.unackedSettings-- 1545 if sc.unackedSettings < 0 { 1546 // Why is the peer ACKing settings we never sent? 1547 // The spec doesn't mention this case, but 1548 // hang up on them anyway. 1549 return ConnectionError(ErrCodeProtocol) 1550 } 1551 return nil 1552 } 1553 if f.NumSettings() > 100 || f.HasDuplicates() { 1554 // This isn't actually in the spec, but hang up on 1555 // suspiciously large settings frames or those with 1556 // duplicate entries. 1557 return ConnectionError(ErrCodeProtocol) 1558 } 1559 if err := f.ForeachSetting(sc.processSetting); err != nil { 1560 return err 1561 } 1562 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be 1563 // acknowledged individually, even if multiple are received before the ACK. 1564 sc.needToSendSettingsAck = true 1565 sc.scheduleFrameWrite() 1566 return nil 1567} 1568 1569func (sc *serverConn) processSetting(s Setting) error { 1570 sc.serveG.check() 1571 if err := s.Valid(); err != nil { 1572 return err 1573 } 1574 if VerboseLogs { 1575 sc.vlogf("http2: server processing setting %v", s) 1576 } 1577 switch s.ID { 1578 case SettingHeaderTableSize: 1579 sc.headerTableSize = s.Val 1580 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) 1581 case SettingEnablePush: 1582 sc.pushEnabled = s.Val != 0 1583 case SettingMaxConcurrentStreams: 1584 sc.clientMaxStreams = s.Val 1585 case SettingInitialWindowSize: 1586 return sc.processSettingInitialWindowSize(s.Val) 1587 case SettingMaxFrameSize: 1588 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31 1589 case SettingMaxHeaderListSize: 1590 sc.peerMaxHeaderListSize = s.Val 1591 default: 1592 // Unknown setting: "An endpoint that receives a SETTINGS 1593 // frame with any unknown or unsupported identifier MUST 1594 // ignore that setting." 1595 if VerboseLogs { 1596 sc.vlogf("http2: server ignoring unknown setting %v", s) 1597 } 1598 } 1599 return nil 1600} 1601 1602func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { 1603 sc.serveG.check() 1604 // Note: val already validated to be within range by 1605 // processSetting's Valid call. 1606 1607 // "A SETTINGS frame can alter the initial flow control window 1608 // size for all current streams. When the value of 1609 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST 1610 // adjust the size of all stream flow control windows that it 1611 // maintains by the difference between the new value and the 1612 // old value." 1613 old := sc.initialStreamSendWindowSize 1614 sc.initialStreamSendWindowSize = int32(val) 1615 growth := int32(val) - old // may be negative 1616 for _, st := range sc.streams { 1617 if !st.flow.add(growth) { 1618 // 6.9.2 Initial Flow Control Window Size 1619 // "An endpoint MUST treat a change to 1620 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow 1621 // control window to exceed the maximum size as a 1622 // connection error (Section 5.4.1) of type 1623 // FLOW_CONTROL_ERROR." 1624 return ConnectionError(ErrCodeFlowControl) 1625 } 1626 } 1627 return nil 1628} 1629 1630func (sc *serverConn) processData(f *DataFrame) error { 1631 sc.serveG.check() 1632 if sc.inGoAway && sc.goAwayCode != ErrCodeNo { 1633 return nil 1634 } 1635 data := f.Data() 1636 1637 // "If a DATA frame is received whose stream is not in "open" 1638 // or "half closed (local)" state, the recipient MUST respond 1639 // with a stream error (Section 5.4.2) of type STREAM_CLOSED." 1640 id := f.Header().StreamID 1641 state, st := sc.state(id) 1642 if id == 0 || state == stateIdle { 1643 // Section 5.1: "Receiving any frame other than HEADERS 1644 // or PRIORITY on a stream in this state MUST be 1645 // treated as a connection error (Section 5.4.1) of 1646 // type PROTOCOL_ERROR." 1647 return ConnectionError(ErrCodeProtocol) 1648 } 1649 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued { 1650 // This includes sending a RST_STREAM if the stream is 1651 // in stateHalfClosedLocal (which currently means that 1652 // the http.Handler returned, so it's done reading & 1653 // done writing). Try to stop the client from sending 1654 // more DATA. 1655 1656 // But still enforce their connection-level flow control, 1657 // and return any flow control bytes since we're not going 1658 // to consume them. 1659 if sc.inflow.available() < int32(f.Length) { 1660 return streamError(id, ErrCodeFlowControl) 1661 } 1662 // Deduct the flow control from inflow, since we're 1663 // going to immediately add it back in 1664 // sendWindowUpdate, which also schedules sending the 1665 // frames. 1666 sc.inflow.take(int32(f.Length)) 1667 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level 1668 1669 if st != nil && st.resetQueued { 1670 // Already have a stream error in flight. Don't send another. 1671 return nil 1672 } 1673 return streamError(id, ErrCodeStreamClosed) 1674 } 1675 if st.body == nil { 1676 panic("internal error: should have a body in this state") 1677 } 1678 1679 // Sender sending more than they'd declared? 1680 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { 1681 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) 1682 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the 1683 // value of a content-length header field does not equal the sum of the 1684 // DATA frame payload lengths that form the body. 1685 return streamError(id, ErrCodeProtocol) 1686 } 1687 if f.Length > 0 { 1688 // Check whether the client has flow control quota. 1689 if st.inflow.available() < int32(f.Length) { 1690 return streamError(id, ErrCodeFlowControl) 1691 } 1692 st.inflow.take(int32(f.Length)) 1693 1694 if len(data) > 0 { 1695 wrote, err := st.body.Write(data) 1696 if err != nil { 1697 return streamError(id, ErrCodeStreamClosed) 1698 } 1699 if wrote != len(data) { 1700 panic("internal error: bad Writer") 1701 } 1702 st.bodyBytes += int64(len(data)) 1703 } 1704 1705 // Return any padded flow control now, since we won't 1706 // refund it later on body reads. 1707 if pad := int32(f.Length) - int32(len(data)); pad > 0 { 1708 sc.sendWindowUpdate32(nil, pad) 1709 sc.sendWindowUpdate32(st, pad) 1710 } 1711 } 1712 if f.StreamEnded() { 1713 st.endStream() 1714 } 1715 return nil 1716} 1717 1718func (sc *serverConn) processGoAway(f *GoAwayFrame) error { 1719 sc.serveG.check() 1720 if f.ErrCode != ErrCodeNo { 1721 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f) 1722 } else { 1723 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) 1724 } 1725 sc.startGracefulShutdownInternal() 1726 // http://tools.ietf.org/html/rfc7540#section-6.8 1727 // We should not create any new streams, which means we should disable push. 1728 sc.pushEnabled = false 1729 return nil 1730} 1731 1732// isPushed reports whether the stream is server-initiated. 1733func (st *stream) isPushed() bool { 1734 return st.id%2 == 0 1735} 1736 1737// endStream closes a Request.Body's pipe. It is called when a DATA 1738// frame says a request body is over (or after trailers). 1739func (st *stream) endStream() { 1740 sc := st.sc 1741 sc.serveG.check() 1742 1743 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { 1744 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", 1745 st.declBodyBytes, st.bodyBytes)) 1746 } else { 1747 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest) 1748 st.body.CloseWithError(io.EOF) 1749 } 1750 st.state = stateHalfClosedRemote 1751} 1752 1753// copyTrailersToHandlerRequest is run in the Handler's goroutine in 1754// its Request.Body.Read just before it gets io.EOF. 1755func (st *stream) copyTrailersToHandlerRequest() { 1756 for k, vv := range st.trailer { 1757 if _, ok := st.reqTrailer[k]; ok { 1758 // Only copy it over it was pre-declared. 1759 st.reqTrailer[k] = vv 1760 } 1761 } 1762} 1763 1764// onWriteTimeout is run on its own goroutine (from time.AfterFunc) 1765// when the stream's WriteTimeout has fired. 1766func (st *stream) onWriteTimeout() { 1767 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)}) 1768} 1769 1770func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { 1771 sc.serveG.check() 1772 id := f.StreamID 1773 if sc.inGoAway { 1774 // Ignore. 1775 return nil 1776 } 1777 // http://tools.ietf.org/html/rfc7540#section-5.1.1 1778 // Streams initiated by a client MUST use odd-numbered stream 1779 // identifiers. [...] An endpoint that receives an unexpected 1780 // stream identifier MUST respond with a connection error 1781 // (Section 5.4.1) of type PROTOCOL_ERROR. 1782 if id%2 != 1 { 1783 return ConnectionError(ErrCodeProtocol) 1784 } 1785 // A HEADERS frame can be used to create a new stream or 1786 // send a trailer for an open one. If we already have a stream 1787 // open, let it process its own HEADERS frame (trailers at this 1788 // point, if it's valid). 1789 if st := sc.streams[f.StreamID]; st != nil { 1790 if st.resetQueued { 1791 // We're sending RST_STREAM to close the stream, so don't bother 1792 // processing this frame. 1793 return nil 1794 } 1795 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than 1796 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in 1797 // this state, it MUST respond with a stream error (Section 5.4.2) of 1798 // type STREAM_CLOSED. 1799 if st.state == stateHalfClosedRemote { 1800 return streamError(id, ErrCodeStreamClosed) 1801 } 1802 return st.processTrailerHeaders(f) 1803 } 1804 1805 // [...] The identifier of a newly established stream MUST be 1806 // numerically greater than all streams that the initiating 1807 // endpoint has opened or reserved. [...] An endpoint that 1808 // receives an unexpected stream identifier MUST respond with 1809 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR. 1810 if id <= sc.maxClientStreamID { 1811 return ConnectionError(ErrCodeProtocol) 1812 } 1813 sc.maxClientStreamID = id 1814 1815 if sc.idleTimer != nil { 1816 sc.idleTimer.Stop() 1817 } 1818 1819 // http://tools.ietf.org/html/rfc7540#section-5.1.2 1820 // [...] Endpoints MUST NOT exceed the limit set by their peer. An 1821 // endpoint that receives a HEADERS frame that causes their 1822 // advertised concurrent stream limit to be exceeded MUST treat 1823 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR 1824 // or REFUSED_STREAM. 1825 if sc.curClientStreams+1 > sc.advMaxStreams { 1826 if sc.unackedSettings == 0 { 1827 // They should know better. 1828 return streamError(id, ErrCodeProtocol) 1829 } 1830 // Assume it's a network race, where they just haven't 1831 // received our last SETTINGS update. But actually 1832 // this can't happen yet, because we don't yet provide 1833 // a way for users to adjust server parameters at 1834 // runtime. 1835 return streamError(id, ErrCodeRefusedStream) 1836 } 1837 1838 initialState := stateOpen 1839 if f.StreamEnded() { 1840 initialState = stateHalfClosedRemote 1841 } 1842 st := sc.newStream(id, 0, initialState) 1843 1844 if f.HasPriority() { 1845 if err := checkPriority(f.StreamID, f.Priority); err != nil { 1846 return err 1847 } 1848 sc.writeSched.AdjustStream(st.id, f.Priority) 1849 } 1850 1851 rw, req, err := sc.newWriterAndRequest(st, f) 1852 if err != nil { 1853 return err 1854 } 1855 st.reqTrailer = req.Trailer 1856 if st.reqTrailer != nil { 1857 st.trailer = make(http.Header) 1858 } 1859 st.body = req.Body.(*requestBody).pipe // may be nil 1860 st.declBodyBytes = req.ContentLength 1861 1862 handler := sc.handler.ServeHTTP 1863 if f.Truncated { 1864 // Their header list was too long. Send a 431 error. 1865 handler = handleHeaderListTooLong 1866 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil { 1867 handler = new400Handler(err) 1868 } 1869 1870 // The net/http package sets the read deadline from the 1871 // http.Server.ReadTimeout during the TLS handshake, but then 1872 // passes the connection off to us with the deadline already 1873 // set. Disarm it here after the request headers are read, 1874 // similar to how the http1 server works. Here it's 1875 // technically more like the http1 Server's ReadHeaderTimeout 1876 // (in Go 1.8), though. That's a more sane option anyway. 1877 if sc.hs.ReadTimeout != 0 { 1878 sc.conn.SetReadDeadline(time.Time{}) 1879 } 1880 1881 go sc.runHandler(rw, req, handler) 1882 return nil 1883} 1884 1885func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error { 1886 sc := st.sc 1887 sc.serveG.check() 1888 if st.gotTrailerHeader { 1889 return ConnectionError(ErrCodeProtocol) 1890 } 1891 st.gotTrailerHeader = true 1892 if !f.StreamEnded() { 1893 return streamError(st.id, ErrCodeProtocol) 1894 } 1895 1896 if len(f.PseudoFields()) > 0 { 1897 return streamError(st.id, ErrCodeProtocol) 1898 } 1899 if st.trailer != nil { 1900 for _, hf := range f.RegularFields() { 1901 key := sc.canonicalHeader(hf.Name) 1902 if !httpguts.ValidTrailerHeader(key) { 1903 // TODO: send more details to the peer somehow. But http2 has 1904 // no way to send debug data at a stream level. Discuss with 1905 // HTTP folk. 1906 return streamError(st.id, ErrCodeProtocol) 1907 } 1908 st.trailer[key] = append(st.trailer[key], hf.Value) 1909 } 1910 } 1911 st.endStream() 1912 return nil 1913} 1914 1915func checkPriority(streamID uint32, p PriorityParam) error { 1916 if streamID == p.StreamDep { 1917 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat 1918 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR." 1919 // Section 5.3.3 says that a stream can depend on one of its dependencies, 1920 // so it's only self-dependencies that are forbidden. 1921 return streamError(streamID, ErrCodeProtocol) 1922 } 1923 return nil 1924} 1925 1926func (sc *serverConn) processPriority(f *PriorityFrame) error { 1927 if sc.inGoAway { 1928 return nil 1929 } 1930 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil { 1931 return err 1932 } 1933 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam) 1934 return nil 1935} 1936 1937func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream { 1938 sc.serveG.check() 1939 if id == 0 { 1940 panic("internal error: cannot create stream with id 0") 1941 } 1942 1943 ctx, cancelCtx := context.WithCancel(sc.baseCtx) 1944 st := &stream{ 1945 sc: sc, 1946 id: id, 1947 state: state, 1948 ctx: ctx, 1949 cancelCtx: cancelCtx, 1950 } 1951 st.cw.Init() 1952 st.flow.conn = &sc.flow // link to conn-level counter 1953 st.flow.add(sc.initialStreamSendWindowSize) 1954 st.inflow.conn = &sc.inflow // link to conn-level counter 1955 st.inflow.add(sc.srv.initialStreamRecvWindowSize()) 1956 if sc.hs.WriteTimeout != 0 { 1957 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) 1958 } 1959 1960 sc.streams[id] = st 1961 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID}) 1962 if st.isPushed() { 1963 sc.curPushedStreams++ 1964 } else { 1965 sc.curClientStreams++ 1966 } 1967 if sc.curOpenStreams() == 1 { 1968 sc.setConnState(http.StateActive) 1969 } 1970 1971 return st 1972} 1973 1974func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) { 1975 sc.serveG.check() 1976 1977 rp := requestParam{ 1978 method: f.PseudoValue("method"), 1979 scheme: f.PseudoValue("scheme"), 1980 authority: f.PseudoValue("authority"), 1981 path: f.PseudoValue("path"), 1982 } 1983 1984 isConnect := rp.method == "CONNECT" 1985 if isConnect { 1986 if rp.path != "" || rp.scheme != "" || rp.authority == "" { 1987 return nil, nil, streamError(f.StreamID, ErrCodeProtocol) 1988 } 1989 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { 1990 // See 8.1.2.6 Malformed Requests and Responses: 1991 // 1992 // Malformed requests or responses that are detected 1993 // MUST be treated as a stream error (Section 5.4.2) 1994 // of type PROTOCOL_ERROR." 1995 // 1996 // 8.1.2.3 Request Pseudo-Header Fields 1997 // "All HTTP/2 requests MUST include exactly one valid 1998 // value for the :method, :scheme, and :path 1999 // pseudo-header fields" 2000 return nil, nil, streamError(f.StreamID, ErrCodeProtocol) 2001 } 2002 2003 bodyOpen := !f.StreamEnded() 2004 if rp.method == "HEAD" && bodyOpen { 2005 // HEAD requests can't have bodies 2006 return nil, nil, streamError(f.StreamID, ErrCodeProtocol) 2007 } 2008 2009 rp.header = make(http.Header) 2010 for _, hf := range f.RegularFields() { 2011 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value) 2012 } 2013 if rp.authority == "" { 2014 rp.authority = rp.header.Get("Host") 2015 } 2016 2017 rw, req, err := sc.newWriterAndRequestNoBody(st, rp) 2018 if err != nil { 2019 return nil, nil, err 2020 } 2021 if bodyOpen { 2022 if vv, ok := rp.header["Content-Length"]; ok { 2023 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) 2024 } else { 2025 req.ContentLength = -1 2026 } 2027 req.Body.(*requestBody).pipe = &pipe{ 2028 b: &dataBuffer{expected: req.ContentLength}, 2029 } 2030 } 2031 return rw, req, nil 2032} 2033 2034type requestParam struct { 2035 method string 2036 scheme, authority, path string 2037 header http.Header 2038} 2039 2040func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) { 2041 sc.serveG.check() 2042 2043 var tlsState *tls.ConnectionState // nil if not scheme https 2044 if rp.scheme == "https" { 2045 tlsState = sc.tlsState 2046 } 2047 2048 needsContinue := rp.header.Get("Expect") == "100-continue" 2049 if needsContinue { 2050 rp.header.Del("Expect") 2051 } 2052 // Merge Cookie headers into one "; "-delimited value. 2053 if cookies := rp.header["Cookie"]; len(cookies) > 1 { 2054 rp.header.Set("Cookie", strings.Join(cookies, "; ")) 2055 } 2056 2057 // Setup Trailers 2058 var trailer http.Header 2059 for _, v := range rp.header["Trailer"] { 2060 for _, key := range strings.Split(v, ",") { 2061 key = http.CanonicalHeaderKey(textproto.TrimString(key)) 2062 switch key { 2063 case "Transfer-Encoding", "Trailer", "Content-Length": 2064 // Bogus. (copy of http1 rules) 2065 // Ignore. 2066 default: 2067 if trailer == nil { 2068 trailer = make(http.Header) 2069 } 2070 trailer[key] = nil 2071 } 2072 } 2073 } 2074 delete(rp.header, "Trailer") 2075 2076 var url_ *url.URL 2077 var requestURI string 2078 if rp.method == "CONNECT" { 2079 url_ = &url.URL{Host: rp.authority} 2080 requestURI = rp.authority // mimic HTTP/1 server behavior 2081 } else { 2082 var err error 2083 url_, err = url.ParseRequestURI(rp.path) 2084 if err != nil { 2085 return nil, nil, streamError(st.id, ErrCodeProtocol) 2086 } 2087 requestURI = rp.path 2088 } 2089 2090 body := &requestBody{ 2091 conn: sc, 2092 stream: st, 2093 needsContinue: needsContinue, 2094 } 2095 req := &http.Request{ 2096 Method: rp.method, 2097 URL: url_, 2098 RemoteAddr: sc.remoteAddrStr, 2099 Header: rp.header, 2100 RequestURI: requestURI, 2101 Proto: "HTTP/2.0", 2102 ProtoMajor: 2, 2103 ProtoMinor: 0, 2104 TLS: tlsState, 2105 Host: rp.authority, 2106 Body: body, 2107 Trailer: trailer, 2108 } 2109 req = req.WithContext(st.ctx) 2110 2111 rws := responseWriterStatePool.Get().(*responseWriterState) 2112 bwSave := rws.bw 2113 *rws = responseWriterState{} // zero all the fields 2114 rws.conn = sc 2115 rws.bw = bwSave 2116 rws.bw.Reset(chunkWriter{rws}) 2117 rws.stream = st 2118 rws.req = req 2119 rws.body = body 2120 2121 rw := &responseWriter{rws: rws} 2122 return rw, req, nil 2123} 2124 2125// Run on its own goroutine. 2126func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { 2127 didPanic := true 2128 defer func() { 2129 rw.rws.stream.cancelCtx() 2130 if didPanic { 2131 e := recover() 2132 sc.writeFrameFromHandler(FrameWriteRequest{ 2133 write: handlerPanicRST{rw.rws.stream.id}, 2134 stream: rw.rws.stream, 2135 }) 2136 // Same as net/http: 2137 if e != nil && e != http.ErrAbortHandler { 2138 const size = 64 << 10 2139 buf := make([]byte, size) 2140 buf = buf[:runtime.Stack(buf, false)] 2141 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf) 2142 } 2143 return 2144 } 2145 rw.handlerDone() 2146 }() 2147 handler(rw, req) 2148 didPanic = false 2149} 2150 2151func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) { 2152 // 10.5.1 Limits on Header Block Size: 2153 // .. "A server that receives a larger header block than it is 2154 // willing to handle can send an HTTP 431 (Request Header Fields Too 2155 // Large) status code" 2156 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+ 2157 w.WriteHeader(statusRequestHeaderFieldsTooLarge) 2158 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") 2159} 2160 2161// called from handler goroutines. 2162// h may be nil. 2163func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error { 2164 sc.serveG.checkNotOn() // NOT on 2165 var errc chan error 2166 if headerData.h != nil { 2167 // If there's a header map (which we don't own), so we have to block on 2168 // waiting for this frame to be written, so an http.Flush mid-handler 2169 // writes out the correct value of keys, before a handler later potentially 2170 // mutates it. 2171 errc = errChanPool.Get().(chan error) 2172 } 2173 if err := sc.writeFrameFromHandler(FrameWriteRequest{ 2174 write: headerData, 2175 stream: st, 2176 done: errc, 2177 }); err != nil { 2178 return err 2179 } 2180 if errc != nil { 2181 select { 2182 case err := <-errc: 2183 errChanPool.Put(errc) 2184 return err 2185 case <-sc.doneServing: 2186 return errClientDisconnected 2187 case <-st.cw: 2188 return errStreamClosed 2189 } 2190 } 2191 return nil 2192} 2193 2194// called from handler goroutines. 2195func (sc *serverConn) write100ContinueHeaders(st *stream) { 2196 sc.writeFrameFromHandler(FrameWriteRequest{ 2197 write: write100ContinueHeadersFrame{st.id}, 2198 stream: st, 2199 }) 2200} 2201 2202// A bodyReadMsg tells the server loop that the http.Handler read n 2203// bytes of the DATA from the client on the given stream. 2204type bodyReadMsg struct { 2205 st *stream 2206 n int 2207} 2208 2209// called from handler goroutines. 2210// Notes that the handler for the given stream ID read n bytes of its body 2211// and schedules flow control tokens to be sent. 2212func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { 2213 sc.serveG.checkNotOn() // NOT on 2214 if n > 0 { 2215 select { 2216 case sc.bodyReadCh <- bodyReadMsg{st, n}: 2217 case <-sc.doneServing: 2218 } 2219 } 2220} 2221 2222func (sc *serverConn) noteBodyRead(st *stream, n int) { 2223 sc.serveG.check() 2224 sc.sendWindowUpdate(nil, n) // conn-level 2225 if st.state != stateHalfClosedRemote && st.state != stateClosed { 2226 // Don't send this WINDOW_UPDATE if the stream is closed 2227 // remotely. 2228 sc.sendWindowUpdate(st, n) 2229 } 2230} 2231 2232// st may be nil for conn-level 2233func (sc *serverConn) sendWindowUpdate(st *stream, n int) { 2234 sc.serveG.check() 2235 // "The legal range for the increment to the flow control 2236 // window is 1 to 2^31-1 (2,147,483,647) octets." 2237 // A Go Read call on 64-bit machines could in theory read 2238 // a larger Read than this. Very unlikely, but we handle it here 2239 // rather than elsewhere for now. 2240 const maxUint31 = 1<<31 - 1 2241 for n >= maxUint31 { 2242 sc.sendWindowUpdate32(st, maxUint31) 2243 n -= maxUint31 2244 } 2245 sc.sendWindowUpdate32(st, int32(n)) 2246} 2247 2248// st may be nil for conn-level 2249func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) { 2250 sc.serveG.check() 2251 if n == 0 { 2252 return 2253 } 2254 if n < 0 { 2255 panic("negative update") 2256 } 2257 var streamID uint32 2258 if st != nil { 2259 streamID = st.id 2260 } 2261 sc.writeFrame(FrameWriteRequest{ 2262 write: writeWindowUpdate{streamID: streamID, n: uint32(n)}, 2263 stream: st, 2264 }) 2265 var ok bool 2266 if st == nil { 2267 ok = sc.inflow.add(n) 2268 } else { 2269 ok = st.inflow.add(n) 2270 } 2271 if !ok { 2272 panic("internal error; sent too many window updates without decrements?") 2273 } 2274} 2275 2276// requestBody is the Handler's Request.Body type. 2277// Read and Close may be called concurrently. 2278type requestBody struct { 2279 _ incomparable 2280 stream *stream 2281 conn *serverConn 2282 closed bool // for use by Close only 2283 sawEOF bool // for use by Read only 2284 pipe *pipe // non-nil if we have a HTTP entity message body 2285 needsContinue bool // need to send a 100-continue 2286} 2287 2288func (b *requestBody) Close() error { 2289 if b.pipe != nil && !b.closed { 2290 b.pipe.BreakWithError(errClosedBody) 2291 } 2292 b.closed = true 2293 return nil 2294} 2295 2296func (b *requestBody) Read(p []byte) (n int, err error) { 2297 if b.needsContinue { 2298 b.needsContinue = false 2299 b.conn.write100ContinueHeaders(b.stream) 2300 } 2301 if b.pipe == nil || b.sawEOF { 2302 return 0, io.EOF 2303 } 2304 n, err = b.pipe.Read(p) 2305 if err == io.EOF { 2306 b.sawEOF = true 2307 } 2308 if b.conn == nil && inTests { 2309 return 2310 } 2311 b.conn.noteBodyReadFromHandler(b.stream, n, err) 2312 return 2313} 2314 2315// responseWriter is the http.ResponseWriter implementation. It's 2316// intentionally small (1 pointer wide) to minimize garbage. The 2317// responseWriterState pointer inside is zeroed at the end of a 2318// request (in handlerDone) and calls on the responseWriter thereafter 2319// simply crash (caller's mistake), but the much larger responseWriterState 2320// and buffers are reused between multiple requests. 2321type responseWriter struct { 2322 rws *responseWriterState 2323} 2324 2325// Optional http.ResponseWriter interfaces implemented. 2326var ( 2327 _ http.CloseNotifier = (*responseWriter)(nil) 2328 _ http.Flusher = (*responseWriter)(nil) 2329 _ stringWriter = (*responseWriter)(nil) 2330) 2331 2332type responseWriterState struct { 2333 // immutable within a request: 2334 stream *stream 2335 req *http.Request 2336 body *requestBody // to close at end of request, if DATA frames didn't 2337 conn *serverConn 2338 2339 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc 2340 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} 2341 2342 // mutated by http.Handler goroutine: 2343 handlerHeader http.Header // nil until called 2344 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time 2345 trailers []string // set in writeChunk 2346 status int // status code passed to WriteHeader 2347 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. 2348 sentHeader bool // have we sent the header frame? 2349 handlerDone bool // handler has finished 2350 dirty bool // a Write failed; don't reuse this responseWriterState 2351 2352 sentContentLen int64 // non-zero if handler set a Content-Length header 2353 wroteBytes int64 2354 2355 closeNotifierMu sync.Mutex // guards closeNotifierCh 2356 closeNotifierCh chan bool // nil until first used 2357} 2358 2359type chunkWriter struct{ rws *responseWriterState } 2360 2361func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) } 2362 2363func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 } 2364 2365func (rws *responseWriterState) hasNonemptyTrailers() bool { 2366 for _, trailer := range rws.trailers { 2367 if _, ok := rws.handlerHeader[trailer]; ok { 2368 return true 2369 } 2370 } 2371 return false 2372} 2373 2374// declareTrailer is called for each Trailer header when the 2375// response header is written. It notes that a header will need to be 2376// written in the trailers at the end of the response. 2377func (rws *responseWriterState) declareTrailer(k string) { 2378 k = http.CanonicalHeaderKey(k) 2379 if !httpguts.ValidTrailerHeader(k) { 2380 // Forbidden by RFC 7230, section 4.1.2. 2381 rws.conn.logf("ignoring invalid trailer %q", k) 2382 return 2383 } 2384 if !strSliceContains(rws.trailers, k) { 2385 rws.trailers = append(rws.trailers, k) 2386 } 2387} 2388 2389// writeChunk writes chunks from the bufio.Writer. But because 2390// bufio.Writer may bypass its chunking, sometimes p may be 2391// arbitrarily large. 2392// 2393// writeChunk is also responsible (on the first chunk) for sending the 2394// HEADER response. 2395func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { 2396 if !rws.wroteHeader { 2397 rws.writeHeader(200) 2398 } 2399 2400 isHeadResp := rws.req.Method == "HEAD" 2401 if !rws.sentHeader { 2402 rws.sentHeader = true 2403 var ctype, clen string 2404 if clen = rws.snapHeader.Get("Content-Length"); clen != "" { 2405 rws.snapHeader.Del("Content-Length") 2406 clen64, err := strconv.ParseInt(clen, 10, 64) 2407 if err == nil && clen64 >= 0 { 2408 rws.sentContentLen = clen64 2409 } else { 2410 clen = "" 2411 } 2412 } 2413 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) { 2414 clen = strconv.Itoa(len(p)) 2415 } 2416 _, hasContentType := rws.snapHeader["Content-Type"] 2417 // If the Content-Encoding is non-blank, we shouldn't 2418 // sniff the body. See Issue golang.org/issue/31753. 2419 ce := rws.snapHeader.Get("Content-Encoding") 2420 hasCE := len(ce) > 0 2421 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 { 2422 ctype = http.DetectContentType(p) 2423 } 2424 var date string 2425 if _, ok := rws.snapHeader["Date"]; !ok { 2426 // TODO(bradfitz): be faster here, like net/http? measure. 2427 date = time.Now().UTC().Format(http.TimeFormat) 2428 } 2429 2430 for _, v := range rws.snapHeader["Trailer"] { 2431 foreachHeaderElement(v, rws.declareTrailer) 2432 } 2433 2434 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2), 2435 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing 2436 // down the TCP connection when idle, like we do for HTTP/1. 2437 // TODO: remove more Connection-specific header fields here, in addition 2438 // to "Connection". 2439 if _, ok := rws.snapHeader["Connection"]; ok { 2440 v := rws.snapHeader.Get("Connection") 2441 delete(rws.snapHeader, "Connection") 2442 if v == "close" { 2443 rws.conn.startGracefulShutdown() 2444 } 2445 } 2446 2447 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp 2448 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ 2449 streamID: rws.stream.id, 2450 httpResCode: rws.status, 2451 h: rws.snapHeader, 2452 endStream: endStream, 2453 contentType: ctype, 2454 contentLength: clen, 2455 date: date, 2456 }) 2457 if err != nil { 2458 rws.dirty = true 2459 return 0, err 2460 } 2461 if endStream { 2462 return 0, nil 2463 } 2464 } 2465 if isHeadResp { 2466 return len(p), nil 2467 } 2468 if len(p) == 0 && !rws.handlerDone { 2469 return 0, nil 2470 } 2471 2472 if rws.handlerDone { 2473 rws.promoteUndeclaredTrailers() 2474 } 2475 2476 // only send trailers if they have actually been defined by the 2477 // server handler. 2478 hasNonemptyTrailers := rws.hasNonemptyTrailers() 2479 endStream := rws.handlerDone && !hasNonemptyTrailers 2480 if len(p) > 0 || endStream { 2481 // only send a 0 byte DATA frame if we're ending the stream. 2482 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { 2483 rws.dirty = true 2484 return 0, err 2485 } 2486 } 2487 2488 if rws.handlerDone && hasNonemptyTrailers { 2489 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ 2490 streamID: rws.stream.id, 2491 h: rws.handlerHeader, 2492 trailers: rws.trailers, 2493 endStream: true, 2494 }) 2495 if err != nil { 2496 rws.dirty = true 2497 } 2498 return len(p), err 2499 } 2500 return len(p), nil 2501} 2502 2503// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys 2504// that, if present, signals that the map entry is actually for 2505// the response trailers, and not the response headers. The prefix 2506// is stripped after the ServeHTTP call finishes and the values are 2507// sent in the trailers. 2508// 2509// This mechanism is intended only for trailers that are not known 2510// prior to the headers being written. If the set of trailers is fixed 2511// or known before the header is written, the normal Go trailers mechanism 2512// is preferred: 2513// https://golang.org/pkg/net/http/#ResponseWriter 2514// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers 2515const TrailerPrefix = "Trailer:" 2516 2517// promoteUndeclaredTrailers permits http.Handlers to set trailers 2518// after the header has already been flushed. Because the Go 2519// ResponseWriter interface has no way to set Trailers (only the 2520// Header), and because we didn't want to expand the ResponseWriter 2521// interface, and because nobody used trailers, and because RFC 7230 2522// says you SHOULD (but not must) predeclare any trailers in the 2523// header, the official ResponseWriter rules said trailers in Go must 2524// be predeclared, and then we reuse the same ResponseWriter.Header() 2525// map to mean both Headers and Trailers. When it's time to write the 2526// Trailers, we pick out the fields of Headers that were declared as 2527// trailers. That worked for a while, until we found the first major 2528// user of Trailers in the wild: gRPC (using them only over http2), 2529// and gRPC libraries permit setting trailers mid-stream without 2530// predeclaring them. So: change of plans. We still permit the old 2531// way, but we also permit this hack: if a Header() key begins with 2532// "Trailer:", the suffix of that key is a Trailer. Because ':' is an 2533// invalid token byte anyway, there is no ambiguity. (And it's already 2534// filtered out) It's mildly hacky, but not terrible. 2535// 2536// This method runs after the Handler is done and promotes any Header 2537// fields to be trailers. 2538func (rws *responseWriterState) promoteUndeclaredTrailers() { 2539 for k, vv := range rws.handlerHeader { 2540 if !strings.HasPrefix(k, TrailerPrefix) { 2541 continue 2542 } 2543 trailerKey := strings.TrimPrefix(k, TrailerPrefix) 2544 rws.declareTrailer(trailerKey) 2545 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv 2546 } 2547 2548 if len(rws.trailers) > 1 { 2549 sorter := sorterPool.Get().(*sorter) 2550 sorter.SortStrings(rws.trailers) 2551 sorterPool.Put(sorter) 2552 } 2553} 2554 2555func (w *responseWriter) Flush() { 2556 rws := w.rws 2557 if rws == nil { 2558 panic("Header called after Handler finished") 2559 } 2560 if rws.bw.Buffered() > 0 { 2561 if err := rws.bw.Flush(); err != nil { 2562 // Ignore the error. The frame writer already knows. 2563 return 2564 } 2565 } else { 2566 // The bufio.Writer won't call chunkWriter.Write 2567 // (writeChunk with zero bytes, so we have to do it 2568 // ourselves to force the HTTP response header and/or 2569 // final DATA frame (with END_STREAM) to be sent. 2570 rws.writeChunk(nil) 2571 } 2572} 2573 2574func (w *responseWriter) CloseNotify() <-chan bool { 2575 rws := w.rws 2576 if rws == nil { 2577 panic("CloseNotify called after Handler finished") 2578 } 2579 rws.closeNotifierMu.Lock() 2580 ch := rws.closeNotifierCh 2581 if ch == nil { 2582 ch = make(chan bool, 1) 2583 rws.closeNotifierCh = ch 2584 cw := rws.stream.cw 2585 go func() { 2586 cw.Wait() // wait for close 2587 ch <- true 2588 }() 2589 } 2590 rws.closeNotifierMu.Unlock() 2591 return ch 2592} 2593 2594func (w *responseWriter) Header() http.Header { 2595 rws := w.rws 2596 if rws == nil { 2597 panic("Header called after Handler finished") 2598 } 2599 if rws.handlerHeader == nil { 2600 rws.handlerHeader = make(http.Header) 2601 } 2602 return rws.handlerHeader 2603} 2604 2605// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode. 2606func checkWriteHeaderCode(code int) { 2607 // Issue 22880: require valid WriteHeader status codes. 2608 // For now we only enforce that it's three digits. 2609 // In the future we might block things over 599 (600 and above aren't defined 2610 // at http://httpwg.org/specs/rfc7231.html#status.codes) 2611 // and we might block under 200 (once we have more mature 1xx support). 2612 // But for now any three digits. 2613 // 2614 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's 2615 // no equivalent bogus thing we can realistically send in HTTP/2, 2616 // so we'll consistently panic instead and help people find their bugs 2617 // early. (We can't return an error from WriteHeader even if we wanted to.) 2618 if code < 100 || code > 999 { 2619 panic(fmt.Sprintf("invalid WriteHeader code %v", code)) 2620 } 2621} 2622 2623func (w *responseWriter) WriteHeader(code int) { 2624 rws := w.rws 2625 if rws == nil { 2626 panic("WriteHeader called after Handler finished") 2627 } 2628 rws.writeHeader(code) 2629} 2630 2631func (rws *responseWriterState) writeHeader(code int) { 2632 if !rws.wroteHeader { 2633 checkWriteHeaderCode(code) 2634 rws.wroteHeader = true 2635 rws.status = code 2636 if len(rws.handlerHeader) > 0 { 2637 rws.snapHeader = cloneHeader(rws.handlerHeader) 2638 } 2639 } 2640} 2641 2642func cloneHeader(h http.Header) http.Header { 2643 h2 := make(http.Header, len(h)) 2644 for k, vv := range h { 2645 vv2 := make([]string, len(vv)) 2646 copy(vv2, vv) 2647 h2[k] = vv2 2648 } 2649 return h2 2650} 2651 2652// The Life Of A Write is like this: 2653// 2654// * Handler calls w.Write or w.WriteString -> 2655// * -> rws.bw (*bufio.Writer) -> 2656// * (Handler might call Flush) 2657// * -> chunkWriter{rws} 2658// * -> responseWriterState.writeChunk(p []byte) 2659// * -> responseWriterState.writeChunk (most of the magic; see comment there) 2660func (w *responseWriter) Write(p []byte) (n int, err error) { 2661 return w.write(len(p), p, "") 2662} 2663 2664func (w *responseWriter) WriteString(s string) (n int, err error) { 2665 return w.write(len(s), nil, s) 2666} 2667 2668// either dataB or dataS is non-zero. 2669func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { 2670 rws := w.rws 2671 if rws == nil { 2672 panic("Write called after Handler finished") 2673 } 2674 if !rws.wroteHeader { 2675 w.WriteHeader(200) 2676 } 2677 if !bodyAllowedForStatus(rws.status) { 2678 return 0, http.ErrBodyNotAllowed 2679 } 2680 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set 2681 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { 2682 // TODO: send a RST_STREAM 2683 return 0, errors.New("http2: handler wrote more than declared Content-Length") 2684 } 2685 2686 if dataB != nil { 2687 return rws.bw.Write(dataB) 2688 } else { 2689 return rws.bw.WriteString(dataS) 2690 } 2691} 2692 2693func (w *responseWriter) handlerDone() { 2694 rws := w.rws 2695 dirty := rws.dirty 2696 rws.handlerDone = true 2697 w.Flush() 2698 w.rws = nil 2699 if !dirty { 2700 // Only recycle the pool if all prior Write calls to 2701 // the serverConn goroutine completed successfully. If 2702 // they returned earlier due to resets from the peer 2703 // there might still be write goroutines outstanding 2704 // from the serverConn referencing the rws memory. See 2705 // issue 20704. 2706 responseWriterStatePool.Put(rws) 2707 } 2708} 2709 2710// Push errors. 2711var ( 2712 ErrRecursivePush = errors.New("http2: recursive push not allowed") 2713 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") 2714) 2715 2716var _ http.Pusher = (*responseWriter)(nil) 2717 2718func (w *responseWriter) Push(target string, opts *http.PushOptions) error { 2719 st := w.rws.stream 2720 sc := st.sc 2721 sc.serveG.checkNotOn() 2722 2723 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream." 2724 // http://tools.ietf.org/html/rfc7540#section-6.6 2725 if st.isPushed() { 2726 return ErrRecursivePush 2727 } 2728 2729 if opts == nil { 2730 opts = new(http.PushOptions) 2731 } 2732 2733 // Default options. 2734 if opts.Method == "" { 2735 opts.Method = "GET" 2736 } 2737 if opts.Header == nil { 2738 opts.Header = http.Header{} 2739 } 2740 wantScheme := "http" 2741 if w.rws.req.TLS != nil { 2742 wantScheme = "https" 2743 } 2744 2745 // Validate the request. 2746 u, err := url.Parse(target) 2747 if err != nil { 2748 return err 2749 } 2750 if u.Scheme == "" { 2751 if !strings.HasPrefix(target, "/") { 2752 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target) 2753 } 2754 u.Scheme = wantScheme 2755 u.Host = w.rws.req.Host 2756 } else { 2757 if u.Scheme != wantScheme { 2758 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme) 2759 } 2760 if u.Host == "" { 2761 return errors.New("URL must have a host") 2762 } 2763 } 2764 for k := range opts.Header { 2765 if strings.HasPrefix(k, ":") { 2766 return fmt.Errorf("promised request headers cannot include pseudo header %q", k) 2767 } 2768 // These headers are meaningful only if the request has a body, 2769 // but PUSH_PROMISE requests cannot have a body. 2770 // http://tools.ietf.org/html/rfc7540#section-8.2 2771 // Also disallow Host, since the promised URL must be absolute. 2772 switch strings.ToLower(k) { 2773 case "content-length", "content-encoding", "trailer", "te", "expect", "host": 2774 return fmt.Errorf("promised request headers cannot include %q", k) 2775 } 2776 } 2777 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil { 2778 return err 2779 } 2780 2781 // The RFC effectively limits promised requests to GET and HEAD: 2782 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]" 2783 // http://tools.ietf.org/html/rfc7540#section-8.2 2784 if opts.Method != "GET" && opts.Method != "HEAD" { 2785 return fmt.Errorf("method %q must be GET or HEAD", opts.Method) 2786 } 2787 2788 msg := &startPushRequest{ 2789 parent: st, 2790 method: opts.Method, 2791 url: u, 2792 header: cloneHeader(opts.Header), 2793 done: errChanPool.Get().(chan error), 2794 } 2795 2796 select { 2797 case <-sc.doneServing: 2798 return errClientDisconnected 2799 case <-st.cw: 2800 return errStreamClosed 2801 case sc.serveMsgCh <- msg: 2802 } 2803 2804 select { 2805 case <-sc.doneServing: 2806 return errClientDisconnected 2807 case <-st.cw: 2808 return errStreamClosed 2809 case err := <-msg.done: 2810 errChanPool.Put(msg.done) 2811 return err 2812 } 2813} 2814 2815type startPushRequest struct { 2816 parent *stream 2817 method string 2818 url *url.URL 2819 header http.Header 2820 done chan error 2821} 2822 2823func (sc *serverConn) startPush(msg *startPushRequest) { 2824 sc.serveG.check() 2825 2826 // http://tools.ietf.org/html/rfc7540#section-6.6. 2827 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that 2828 // is in either the "open" or "half-closed (remote)" state. 2829 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote { 2830 // responseWriter.Push checks that the stream is peer-initiated. 2831 msg.done <- errStreamClosed 2832 return 2833 } 2834 2835 // http://tools.ietf.org/html/rfc7540#section-6.6. 2836 if !sc.pushEnabled { 2837 msg.done <- http.ErrNotSupported 2838 return 2839 } 2840 2841 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so 2842 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE 2843 // is written. Once the ID is allocated, we start the request handler. 2844 allocatePromisedID := func() (uint32, error) { 2845 sc.serveG.check() 2846 2847 // Check this again, just in case. Technically, we might have received 2848 // an updated SETTINGS by the time we got around to writing this frame. 2849 if !sc.pushEnabled { 2850 return 0, http.ErrNotSupported 2851 } 2852 // http://tools.ietf.org/html/rfc7540#section-6.5.2. 2853 if sc.curPushedStreams+1 > sc.clientMaxStreams { 2854 return 0, ErrPushLimitReached 2855 } 2856 2857 // http://tools.ietf.org/html/rfc7540#section-5.1.1. 2858 // Streams initiated by the server MUST use even-numbered identifiers. 2859 // A server that is unable to establish a new stream identifier can send a GOAWAY 2860 // frame so that the client is forced to open a new connection for new streams. 2861 if sc.maxPushPromiseID+2 >= 1<<31 { 2862 sc.startGracefulShutdownInternal() 2863 return 0, ErrPushLimitReached 2864 } 2865 sc.maxPushPromiseID += 2 2866 promisedID := sc.maxPushPromiseID 2867 2868 // http://tools.ietf.org/html/rfc7540#section-8.2. 2869 // Strictly speaking, the new stream should start in "reserved (local)", then 2870 // transition to "half closed (remote)" after sending the initial HEADERS, but 2871 // we start in "half closed (remote)" for simplicity. 2872 // See further comments at the definition of stateHalfClosedRemote. 2873 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote) 2874 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{ 2875 method: msg.method, 2876 scheme: msg.url.Scheme, 2877 authority: msg.url.Host, 2878 path: msg.url.RequestURI(), 2879 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE 2880 }) 2881 if err != nil { 2882 // Should not happen, since we've already validated msg.url. 2883 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) 2884 } 2885 2886 go sc.runHandler(rw, req, sc.handler.ServeHTTP) 2887 return promisedID, nil 2888 } 2889 2890 sc.writeFrame(FrameWriteRequest{ 2891 write: &writePushPromise{ 2892 streamID: msg.parent.id, 2893 method: msg.method, 2894 url: msg.url, 2895 h: msg.header, 2896 allocatePromisedID: allocatePromisedID, 2897 }, 2898 stream: msg.parent, 2899 done: msg.done, 2900 }) 2901} 2902 2903// foreachHeaderElement splits v according to the "#rule" construction 2904// in RFC 7230 section 7 and calls fn for each non-empty element. 2905func foreachHeaderElement(v string, fn func(string)) { 2906 v = textproto.TrimString(v) 2907 if v == "" { 2908 return 2909 } 2910 if !strings.Contains(v, ",") { 2911 fn(v) 2912 return 2913 } 2914 for _, f := range strings.Split(v, ",") { 2915 if f = textproto.TrimString(f); f != "" { 2916 fn(f) 2917 } 2918 } 2919} 2920 2921// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2 2922var connHeaders = []string{ 2923 "Connection", 2924 "Keep-Alive", 2925 "Proxy-Connection", 2926 "Transfer-Encoding", 2927 "Upgrade", 2928} 2929 2930// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request, 2931// per RFC 7540 Section 8.1.2.2. 2932// The returned error is reported to users. 2933func checkValidHTTP2RequestHeaders(h http.Header) error { 2934 for _, k := range connHeaders { 2935 if _, ok := h[k]; ok { 2936 return fmt.Errorf("request header %q is not valid in HTTP/2", k) 2937 } 2938 } 2939 te := h["Te"] 2940 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) { 2941 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`) 2942 } 2943 return nil 2944} 2945 2946func new400Handler(err error) http.HandlerFunc { 2947 return func(w http.ResponseWriter, r *http.Request) { 2948 http.Error(w, err.Error(), http.StatusBadRequest) 2949 } 2950} 2951 2952// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives 2953// disabled. See comments on h1ServerShutdownChan above for why 2954// the code is written this way. 2955func h1ServerKeepAlivesDisabled(hs *http.Server) bool { 2956 var x interface{} = hs 2957 type I interface { 2958 doKeepAlives() bool 2959 } 2960 if hs, ok := x.(I); ok { 2961 return !hs.doKeepAlives() 2962 } 2963 return false 2964} 2965