1// Copyright 2015 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Transport code. 6 7package http2 8 9import ( 10 "bufio" 11 "bytes" 12 "compress/gzip" 13 "crypto/tls" 14 "errors" 15 "fmt" 16 "io" 17 "io/ioutil" 18 "log" 19 "net" 20 "net/http" 21 "sort" 22 "strconv" 23 "strings" 24 "sync" 25 "time" 26 27 "golang.org/x/net/http2/hpack" 28) 29 30const ( 31 // transportDefaultConnFlow is how many connection-level flow control 32 // tokens we give the server at start-up, past the default 64k. 33 transportDefaultConnFlow = 1 << 30 34 35 // transportDefaultStreamFlow is how many stream-level flow 36 // control tokens we announce to the peer, and how many bytes 37 // we buffer per stream. 38 transportDefaultStreamFlow = 4 << 20 39 40 // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send 41 // a stream-level WINDOW_UPDATE for at a time. 42 transportDefaultStreamMinRefresh = 4 << 10 43 44 defaultUserAgent = "Go-http-client/2.0" 45) 46 47// Transport is an HTTP/2 Transport. 48// 49// A Transport internally caches connections to servers. It is safe 50// for concurrent use by multiple goroutines. 51type Transport struct { 52 // DialTLS specifies an optional dial function for creating 53 // TLS connections for requests. 54 // 55 // If DialTLS is nil, tls.Dial is used. 56 // 57 // If the returned net.Conn has a ConnectionState method like tls.Conn, 58 // it will be used to set http.Response.TLS. 59 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error) 60 61 // TLSClientConfig specifies the TLS configuration to use with 62 // tls.Client. If nil, the default configuration is used. 63 TLSClientConfig *tls.Config 64 65 // ConnPool optionally specifies an alternate connection pool to use. 66 // If nil, the default is used. 67 ConnPool ClientConnPool 68 69 // DisableCompression, if true, prevents the Transport from 70 // requesting compression with an "Accept-Encoding: gzip" 71 // request header when the Request contains no existing 72 // Accept-Encoding value. If the Transport requests gzip on 73 // its own and gets a gzipped response, it's transparently 74 // decoded in the Response.Body. However, if the user 75 // explicitly requested gzip it is not automatically 76 // uncompressed. 77 DisableCompression bool 78 79 // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to 80 // send in the initial settings frame. It is how many bytes 81 // of response headers are allow. Unlike the http2 spec, zero here 82 // means to use a default limit (currently 10MB). If you actually 83 // want to advertise an ulimited value to the peer, Transport 84 // interprets the highest possible value here (0xffffffff or 1<<32-1) 85 // to mean no limit. 86 MaxHeaderListSize uint32 87 88 // t1, if non-nil, is the standard library Transport using 89 // this transport. Its settings are used (but not its 90 // RoundTrip method, etc). 91 t1 *http.Transport 92 93 connPoolOnce sync.Once 94 connPoolOrDef ClientConnPool // non-nil version of ConnPool 95} 96 97func (t *Transport) maxHeaderListSize() uint32 { 98 if t.MaxHeaderListSize == 0 { 99 return 10 << 20 100 } 101 if t.MaxHeaderListSize == 0xffffffff { 102 return 0 103 } 104 return t.MaxHeaderListSize 105} 106 107func (t *Transport) disableCompression() bool { 108 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) 109} 110 111var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6") 112 113// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. 114// It requires Go 1.6 or later and returns an error if the net/http package is too old 115// or if t1 has already been HTTP/2-enabled. 116func ConfigureTransport(t1 *http.Transport) error { 117 _, err := configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go 118 return err 119} 120 121func (t *Transport) connPool() ClientConnPool { 122 t.connPoolOnce.Do(t.initConnPool) 123 return t.connPoolOrDef 124} 125 126func (t *Transport) initConnPool() { 127 if t.ConnPool != nil { 128 t.connPoolOrDef = t.ConnPool 129 } else { 130 t.connPoolOrDef = &clientConnPool{t: t} 131 } 132} 133 134// ClientConn is the state of a single HTTP/2 client connection to an 135// HTTP/2 server. 136type ClientConn struct { 137 t *Transport 138 tconn net.Conn // usually *tls.Conn, except specialized impls 139 tlsState *tls.ConnectionState // nil only for specialized impls 140 141 // readLoop goroutine fields: 142 readerDone chan struct{} // closed on error 143 readerErr error // set before readerDone is closed 144 145 mu sync.Mutex // guards following 146 cond *sync.Cond // hold mu; broadcast on flow/closed changes 147 flow flow // our conn-level flow control quota (cs.flow is per stream) 148 inflow flow // peer's conn-level flow control 149 closed bool 150 goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received 151 streams map[uint32]*clientStream // client-initiated 152 nextStreamID uint32 153 bw *bufio.Writer 154 br *bufio.Reader 155 fr *Framer 156 // Settings from peer: 157 maxFrameSize uint32 158 maxConcurrentStreams uint32 159 initialWindowSize uint32 160 hbuf bytes.Buffer // HPACK encoder writes into this 161 henc *hpack.Encoder 162 freeBuf [][]byte 163 164 wmu sync.Mutex // held while writing; acquire AFTER mu if holding both 165 werr error // first write error that has occurred 166} 167 168// clientStream is the state for a single HTTP/2 stream. One of these 169// is created for each Transport.RoundTrip call. 170type clientStream struct { 171 cc *ClientConn 172 req *http.Request 173 ID uint32 174 resc chan resAndError 175 bufPipe pipe // buffered pipe with the flow-controlled response payload 176 requestedGzip bool 177 178 flow flow // guarded by cc.mu 179 inflow flow // guarded by cc.mu 180 bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read 181 readErr error // sticky read error; owned by transportResponseBody.Read 182 stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu 183 184 peerReset chan struct{} // closed on peer reset 185 resetErr error // populated before peerReset is closed 186 187 done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu 188 189 // owned by clientConnReadLoop: 190 pastHeaders bool // got first MetaHeadersFrame (actual headers) 191 pastTrailers bool // got optional second MetaHeadersFrame (trailers) 192 193 trailer http.Header // accumulated trailers 194 resTrailer *http.Header // client's Response.Trailer 195} 196 197// awaitRequestCancel runs in its own goroutine and waits for the user 198// to either cancel a RoundTrip request (using the provided 199// Request.Cancel channel), or for the request to be done (any way it 200// might be removed from the cc.streams map: peer reset, successful 201// completion, TCP connection breakage, etc) 202func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) { 203 if cancel == nil { 204 return 205 } 206 select { 207 case <-cancel: 208 cs.bufPipe.CloseWithError(errRequestCanceled) 209 cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) 210 case <-cs.done: 211 } 212} 213 214// checkReset reports any error sent in a RST_STREAM frame by the 215// server. 216func (cs *clientStream) checkReset() error { 217 select { 218 case <-cs.peerReset: 219 return cs.resetErr 220 default: 221 return nil 222 } 223} 224 225func (cs *clientStream) abortRequestBodyWrite(err error) { 226 if err == nil { 227 panic("nil error") 228 } 229 cc := cs.cc 230 cc.mu.Lock() 231 cs.stopReqBody = err 232 cc.cond.Broadcast() 233 cc.mu.Unlock() 234} 235 236type stickyErrWriter struct { 237 w io.Writer 238 err *error 239} 240 241func (sew stickyErrWriter) Write(p []byte) (n int, err error) { 242 if *sew.err != nil { 243 return 0, *sew.err 244 } 245 n, err = sew.w.Write(p) 246 *sew.err = err 247 return 248} 249 250var ErrNoCachedConn = errors.New("http2: no cached connection was available") 251 252// RoundTripOpt are options for the Transport.RoundTripOpt method. 253type RoundTripOpt struct { 254 // OnlyCachedConn controls whether RoundTripOpt may 255 // create a new TCP connection. If set true and 256 // no cached connection is available, RoundTripOpt 257 // will return ErrNoCachedConn. 258 OnlyCachedConn bool 259} 260 261func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { 262 return t.RoundTripOpt(req, RoundTripOpt{}) 263} 264 265// authorityAddr returns a given authority (a host/IP, or host:port / ip:port) 266// and returns a host:port. The port 443 is added if needed. 267func authorityAddr(authority string) (addr string) { 268 if _, _, err := net.SplitHostPort(authority); err == nil { 269 return authority 270 } 271 return net.JoinHostPort(authority, "443") 272} 273 274// RoundTripOpt is like RoundTrip, but takes options. 275func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { 276 if req.URL.Scheme != "https" { 277 return nil, errors.New("http2: unsupported scheme") 278 } 279 280 addr := authorityAddr(req.URL.Host) 281 for { 282 cc, err := t.connPool().GetClientConn(req, addr) 283 if err != nil { 284 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) 285 return nil, err 286 } 287 res, err := cc.RoundTrip(req) 288 if shouldRetryRequest(req, err) { 289 continue 290 } 291 if err != nil { 292 t.vlogf("RoundTrip failure: %v", err) 293 return nil, err 294 } 295 return res, nil 296 } 297} 298 299// CloseIdleConnections closes any connections which were previously 300// connected from previous requests but are now sitting idle. 301// It does not interrupt any connections currently in use. 302func (t *Transport) CloseIdleConnections() { 303 if cp, ok := t.connPool().(*clientConnPool); ok { 304 cp.closeIdleConnections() 305 } 306} 307 308var ( 309 errClientConnClosed = errors.New("http2: client conn is closed") 310 errClientConnUnusable = errors.New("http2: client conn not usable") 311) 312 313func shouldRetryRequest(req *http.Request, err error) bool { 314 // TODO: retry GET requests (no bodies) more aggressively, if shutdown 315 // before response. 316 return err == errClientConnUnusable 317} 318 319func (t *Transport) dialClientConn(addr string) (*ClientConn, error) { 320 host, _, err := net.SplitHostPort(addr) 321 if err != nil { 322 return nil, err 323 } 324 tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) 325 if err != nil { 326 return nil, err 327 } 328 return t.NewClientConn(tconn) 329} 330 331func (t *Transport) newTLSConfig(host string) *tls.Config { 332 cfg := new(tls.Config) 333 if t.TLSClientConfig != nil { 334 *cfg = *t.TLSClientConfig 335 } 336 if !strSliceContains(cfg.NextProtos, NextProtoTLS) { 337 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...) 338 } 339 if cfg.ServerName == "" { 340 cfg.ServerName = host 341 } 342 return cfg 343} 344 345func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { 346 if t.DialTLS != nil { 347 return t.DialTLS 348 } 349 return t.dialTLSDefault 350} 351 352func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { 353 cn, err := tls.Dial(network, addr, cfg) 354 if err != nil { 355 return nil, err 356 } 357 if err := cn.Handshake(); err != nil { 358 return nil, err 359 } 360 if !cfg.InsecureSkipVerify { 361 if err := cn.VerifyHostname(cfg.ServerName); err != nil { 362 return nil, err 363 } 364 } 365 state := cn.ConnectionState() 366 if p := state.NegotiatedProtocol; p != NextProtoTLS { 367 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS) 368 } 369 if !state.NegotiatedProtocolIsMutual { 370 return nil, errors.New("http2: could not negotiate protocol mutually") 371 } 372 return cn, nil 373} 374 375// disableKeepAlives reports whether connections should be closed as 376// soon as possible after handling the first request. 377func (t *Transport) disableKeepAlives() bool { 378 return t.t1 != nil && t.t1.DisableKeepAlives 379} 380 381func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { 382 if VerboseLogs { 383 t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr()) 384 } 385 if _, err := c.Write(clientPreface); err != nil { 386 t.vlogf("client preface write error: %v", err) 387 return nil, err 388 } 389 390 cc := &ClientConn{ 391 t: t, 392 tconn: c, 393 readerDone: make(chan struct{}), 394 nextStreamID: 1, 395 maxFrameSize: 16 << 10, // spec default 396 initialWindowSize: 65535, // spec default 397 maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. 398 streams: make(map[uint32]*clientStream), 399 } 400 cc.cond = sync.NewCond(&cc.mu) 401 cc.flow.add(int32(initialWindowSize)) 402 403 // TODO: adjust this writer size to account for frame size + 404 // MTU + crypto/tls record padding. 405 cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr}) 406 cc.br = bufio.NewReader(c) 407 cc.fr = NewFramer(cc.bw, cc.br) 408 cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) 409 cc.fr.MaxHeaderListSize = t.maxHeaderListSize() 410 411 // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on 412 // henc in response to SETTINGS frames? 413 cc.henc = hpack.NewEncoder(&cc.hbuf) 414 415 if cs, ok := c.(connectionStater); ok { 416 state := cs.ConnectionState() 417 cc.tlsState = &state 418 } 419 420 initialSettings := []Setting{ 421 {ID: SettingEnablePush, Val: 0}, 422 {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, 423 } 424 if max := t.maxHeaderListSize(); max != 0 { 425 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max}) 426 } 427 cc.fr.WriteSettings(initialSettings...) 428 cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) 429 cc.inflow.add(transportDefaultConnFlow + initialWindowSize) 430 cc.bw.Flush() 431 if cc.werr != nil { 432 return nil, cc.werr 433 } 434 435 // Read the obligatory SETTINGS frame 436 f, err := cc.fr.ReadFrame() 437 if err != nil { 438 return nil, err 439 } 440 sf, ok := f.(*SettingsFrame) 441 if !ok { 442 return nil, fmt.Errorf("expected settings frame, got: %T", f) 443 } 444 cc.fr.WriteSettingsAck() 445 cc.bw.Flush() 446 447 sf.ForeachSetting(func(s Setting) error { 448 switch s.ID { 449 case SettingMaxFrameSize: 450 cc.maxFrameSize = s.Val 451 case SettingMaxConcurrentStreams: 452 cc.maxConcurrentStreams = s.Val 453 case SettingInitialWindowSize: 454 cc.initialWindowSize = s.Val 455 default: 456 // TODO(bradfitz): handle more; at least SETTINGS_HEADER_TABLE_SIZE? 457 t.vlogf("Unhandled Setting: %v", s) 458 } 459 return nil 460 }) 461 462 go cc.readLoop() 463 return cc, nil 464} 465 466func (cc *ClientConn) setGoAway(f *GoAwayFrame) { 467 cc.mu.Lock() 468 defer cc.mu.Unlock() 469 cc.goAway = f 470} 471 472func (cc *ClientConn) CanTakeNewRequest() bool { 473 cc.mu.Lock() 474 defer cc.mu.Unlock() 475 return cc.canTakeNewRequestLocked() 476} 477 478func (cc *ClientConn) canTakeNewRequestLocked() bool { 479 return cc.goAway == nil && !cc.closed && 480 int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && 481 cc.nextStreamID < 2147483647 482} 483 484func (cc *ClientConn) closeIfIdle() { 485 cc.mu.Lock() 486 if len(cc.streams) > 0 { 487 cc.mu.Unlock() 488 return 489 } 490 cc.closed = true 491 // TODO: do clients send GOAWAY too? maybe? Just Close: 492 cc.mu.Unlock() 493 494 cc.tconn.Close() 495} 496 497const maxAllocFrameSize = 512 << 10 498 499// frameBuffer returns a scratch buffer suitable for writing DATA frames. 500// They're capped at the min of the peer's max frame size or 512KB 501// (kinda arbitrarily), but definitely capped so we don't allocate 4GB 502// bufers. 503func (cc *ClientConn) frameScratchBuffer() []byte { 504 cc.mu.Lock() 505 size := cc.maxFrameSize 506 if size > maxAllocFrameSize { 507 size = maxAllocFrameSize 508 } 509 for i, buf := range cc.freeBuf { 510 if len(buf) >= int(size) { 511 cc.freeBuf[i] = nil 512 cc.mu.Unlock() 513 return buf[:size] 514 } 515 } 516 cc.mu.Unlock() 517 return make([]byte, size) 518} 519 520func (cc *ClientConn) putFrameScratchBuffer(buf []byte) { 521 cc.mu.Lock() 522 defer cc.mu.Unlock() 523 const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. 524 if len(cc.freeBuf) < maxBufs { 525 cc.freeBuf = append(cc.freeBuf, buf) 526 return 527 } 528 for i, old := range cc.freeBuf { 529 if old == nil { 530 cc.freeBuf[i] = buf 531 return 532 } 533 } 534 // forget about it. 535} 536 537// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not 538// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. 539var errRequestCanceled = errors.New("net/http: request canceled") 540 541func commaSeparatedTrailers(req *http.Request) (string, error) { 542 keys := make([]string, 0, len(req.Trailer)) 543 for k := range req.Trailer { 544 k = http.CanonicalHeaderKey(k) 545 switch k { 546 case "Transfer-Encoding", "Trailer", "Content-Length": 547 return "", &badStringError{"invalid Trailer key", k} 548 } 549 keys = append(keys, k) 550 } 551 if len(keys) > 0 { 552 sort.Strings(keys) 553 // TODO: could do better allocation-wise here, but trailers are rare, 554 // so being lazy for now. 555 return strings.Join(keys, ","), nil 556 } 557 return "", nil 558} 559 560func (cc *ClientConn) responseHeaderTimeout() time.Duration { 561 if cc.t.t1 != nil { 562 return cc.t.t1.ResponseHeaderTimeout 563 } 564 // No way to do this (yet?) with just an http2.Transport. Probably 565 // no need. Request.Cancel this is the new way. We only need to support 566 // this for compatibility with the old http.Transport fields when 567 // we're doing transparent http2. 568 return 0 569} 570 571// checkConnHeaders checks whether req has any invalid connection-level headers. 572// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields. 573// Certain headers are special-cased as okay but not transmitted later. 574func checkConnHeaders(req *http.Request) error { 575 if v := req.Header.Get("Upgrade"); v != "" { 576 return errors.New("http2: invalid Upgrade request header") 577 } 578 if v := req.Header.Get("Transfer-Encoding"); (v != "" && v != "chunked") || len(req.Header["Transfer-Encoding"]) > 1 { 579 return errors.New("http2: invalid Transfer-Encoding request header") 580 } 581 if v := req.Header.Get("Connection"); (v != "" && v != "close" && v != "keep-alive") || len(req.Header["Connection"]) > 1 { 582 return errors.New("http2: invalid Connection request header") 583 } 584 return nil 585} 586 587func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { 588 if err := checkConnHeaders(req); err != nil { 589 return nil, err 590 } 591 592 trailers, err := commaSeparatedTrailers(req) 593 if err != nil { 594 return nil, err 595 } 596 hasTrailers := trailers != "" 597 598 var body io.Reader = req.Body 599 contentLen := req.ContentLength 600 if req.Body != nil && contentLen == 0 { 601 // Test to see if it's actually zero or just unset. 602 var buf [1]byte 603 n, rerr := io.ReadFull(body, buf[:]) 604 if rerr != nil && rerr != io.EOF { 605 contentLen = -1 606 body = errorReader{rerr} 607 } else if n == 1 { 608 // Oh, guess there is data in this Body Reader after all. 609 // The ContentLength field just wasn't set. 610 // Stich the Body back together again, re-attaching our 611 // consumed byte. 612 contentLen = -1 613 body = io.MultiReader(bytes.NewReader(buf[:]), body) 614 } else { 615 // Body is actually empty. 616 body = nil 617 } 618 } 619 620 cc.mu.Lock() 621 if cc.closed || !cc.canTakeNewRequestLocked() { 622 cc.mu.Unlock() 623 return nil, errClientConnUnusable 624 } 625 626 cs := cc.newStream() 627 cs.req = req 628 hasBody := body != nil 629 630 // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? 631 if !cc.t.disableCompression() && 632 req.Header.Get("Accept-Encoding") == "" && 633 req.Header.Get("Range") == "" && 634 req.Method != "HEAD" { 635 // Request gzip only, not deflate. Deflate is ambiguous and 636 // not as universally supported anyway. 637 // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 638 // 639 // Note that we don't request this for HEAD requests, 640 // due to a bug in nginx: 641 // http://trac.nginx.org/nginx/ticket/358 642 // https://golang.org/issue/5522 643 // 644 // We don't request gzip if the request is for a range, since 645 // auto-decoding a portion of a gzipped document will just fail 646 // anyway. See https://golang.org/issue/8923 647 cs.requestedGzip = true 648 } 649 650 // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is 651 // sent by writeRequestBody below, along with any Trailers, 652 // again in form HEADERS{1}, CONTINUATION{0,}) 653 hdrs := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen) 654 cc.wmu.Lock() 655 endStream := !hasBody && !hasTrailers 656 werr := cc.writeHeaders(cs.ID, endStream, hdrs) 657 cc.wmu.Unlock() 658 cc.mu.Unlock() 659 660 if werr != nil { 661 if hasBody { 662 req.Body.Close() // per RoundTripper contract 663 } 664 cc.forgetStreamID(cs.ID) 665 // Don't bother sending a RST_STREAM (our write already failed; 666 // no need to keep writing) 667 return nil, werr 668 } 669 670 var respHeaderTimer <-chan time.Time 671 var bodyCopyErrc chan error // result of body copy 672 if hasBody { 673 bodyCopyErrc = make(chan error, 1) 674 go func() { 675 bodyCopyErrc <- cs.writeRequestBody(body, req.Body) 676 }() 677 } else { 678 if d := cc.responseHeaderTimeout(); d != 0 { 679 timer := time.NewTimer(d) 680 defer timer.Stop() 681 respHeaderTimer = timer.C 682 } 683 } 684 685 readLoopResCh := cs.resc 686 requestCanceledCh := requestCancel(req) 687 bodyWritten := false 688 689 for { 690 select { 691 case re := <-readLoopResCh: 692 res := re.res 693 if re.err != nil || res.StatusCode > 299 { 694 // On error or status code 3xx, 4xx, 5xx, etc abort any 695 // ongoing write, assuming that the server doesn't care 696 // about our request body. If the server replied with 1xx or 697 // 2xx, however, then assume the server DOES potentially 698 // want our body (e.g. full-duplex streaming: 699 // golang.org/issue/13444). If it turns out the server 700 // doesn't, they'll RST_STREAM us soon enough. This is a 701 // heuristic to avoid adding knobs to Transport. Hopefully 702 // we can keep it. 703 cs.abortRequestBodyWrite(errStopReqBodyWrite) 704 } 705 if re.err != nil { 706 cc.forgetStreamID(cs.ID) 707 return nil, re.err 708 } 709 res.Request = req 710 res.TLS = cc.tlsState 711 return res, nil 712 case <-respHeaderTimer: 713 cc.forgetStreamID(cs.ID) 714 if !hasBody || bodyWritten { 715 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) 716 } else { 717 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) 718 } 719 return nil, errTimeout 720 case <-requestCanceledCh: 721 cc.forgetStreamID(cs.ID) 722 if !hasBody || bodyWritten { 723 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) 724 } else { 725 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) 726 } 727 return nil, errRequestCanceled 728 case <-cs.peerReset: 729 // processResetStream already removed the 730 // stream from the streams map; no need for 731 // forgetStreamID. 732 return nil, cs.resetErr 733 case err := <-bodyCopyErrc: 734 if err != nil { 735 return nil, err 736 } 737 bodyWritten = true 738 if d := cc.responseHeaderTimeout(); d != 0 { 739 timer := time.NewTimer(d) 740 defer timer.Stop() 741 respHeaderTimer = timer.C 742 } 743 } 744 } 745} 746 747// requires cc.wmu be held 748func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { 749 first := true // first frame written (HEADERS is first, then CONTINUATION) 750 frameSize := int(cc.maxFrameSize) 751 for len(hdrs) > 0 && cc.werr == nil { 752 chunk := hdrs 753 if len(chunk) > frameSize { 754 chunk = chunk[:frameSize] 755 } 756 hdrs = hdrs[len(chunk):] 757 endHeaders := len(hdrs) == 0 758 if first { 759 cc.fr.WriteHeaders(HeadersFrameParam{ 760 StreamID: streamID, 761 BlockFragment: chunk, 762 EndStream: endStream, 763 EndHeaders: endHeaders, 764 }) 765 first = false 766 } else { 767 cc.fr.WriteContinuation(streamID, endHeaders, chunk) 768 } 769 } 770 // TODO(bradfitz): this Flush could potentially block (as 771 // could the WriteHeaders call(s) above), which means they 772 // wouldn't respond to Request.Cancel being readable. That's 773 // rare, but this should probably be in a goroutine. 774 cc.bw.Flush() 775 return cc.werr 776} 777 778// internal error values; they don't escape to callers 779var ( 780 // abort request body write; don't send cancel 781 errStopReqBodyWrite = errors.New("http2: aborting request body write") 782 783 // abort request body write, but send stream reset of cancel. 784 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request") 785) 786 787func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { 788 cc := cs.cc 789 sentEnd := false // whether we sent the final DATA frame w/ END_STREAM 790 buf := cc.frameScratchBuffer() 791 defer cc.putFrameScratchBuffer(buf) 792 793 defer func() { 794 // TODO: write h12Compare test showing whether 795 // Request.Body is closed by the Transport, 796 // and in multiple cases: server replies <=299 and >299 797 // while still writing request body 798 cerr := bodyCloser.Close() 799 if err == nil { 800 err = cerr 801 } 802 }() 803 804 req := cs.req 805 hasTrailers := req.Trailer != nil 806 807 var sawEOF bool 808 for !sawEOF { 809 n, err := body.Read(buf) 810 if err == io.EOF { 811 sawEOF = true 812 err = nil 813 } else if err != nil { 814 return err 815 } 816 817 remain := buf[:n] 818 for len(remain) > 0 && err == nil { 819 var allowed int32 820 allowed, err = cs.awaitFlowControl(len(remain)) 821 switch { 822 case err == errStopReqBodyWrite: 823 return err 824 case err == errStopReqBodyWriteAndCancel: 825 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) 826 return err 827 case err != nil: 828 return err 829 } 830 cc.wmu.Lock() 831 data := remain[:allowed] 832 remain = remain[allowed:] 833 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers 834 err = cc.fr.WriteData(cs.ID, sentEnd, data) 835 if err == nil { 836 // TODO(bradfitz): this flush is for latency, not bandwidth. 837 // Most requests won't need this. Make this opt-in or opt-out? 838 // Use some heuristic on the body type? Nagel-like timers? 839 // Based on 'n'? Only last chunk of this for loop, unless flow control 840 // tokens are low? For now, always: 841 err = cc.bw.Flush() 842 } 843 cc.wmu.Unlock() 844 } 845 if err != nil { 846 return err 847 } 848 } 849 850 cc.wmu.Lock() 851 if !sentEnd { 852 var trls []byte 853 if hasTrailers { 854 cc.mu.Lock() 855 trls = cc.encodeTrailers(req) 856 cc.mu.Unlock() 857 } 858 859 // Avoid forgetting to send an END_STREAM if the encoded 860 // trailers are 0 bytes. Both results produce and END_STREAM. 861 if len(trls) > 0 { 862 err = cc.writeHeaders(cs.ID, true, trls) 863 } else { 864 err = cc.fr.WriteData(cs.ID, true, nil) 865 } 866 } 867 if ferr := cc.bw.Flush(); ferr != nil && err == nil { 868 err = ferr 869 } 870 cc.wmu.Unlock() 871 872 return err 873} 874 875// awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow 876// control tokens from the server. 877// It returns either the non-zero number of tokens taken or an error 878// if the stream is dead. 879func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { 880 cc := cs.cc 881 cc.mu.Lock() 882 defer cc.mu.Unlock() 883 for { 884 if cc.closed { 885 return 0, errClientConnClosed 886 } 887 if cs.stopReqBody != nil { 888 return 0, cs.stopReqBody 889 } 890 if err := cs.checkReset(); err != nil { 891 return 0, err 892 } 893 if a := cs.flow.available(); a > 0 { 894 take := a 895 if int(take) > maxBytes { 896 897 take = int32(maxBytes) // can't truncate int; take is int32 898 } 899 if take > int32(cc.maxFrameSize) { 900 take = int32(cc.maxFrameSize) 901 } 902 cs.flow.take(take) 903 return take, nil 904 } 905 cc.cond.Wait() 906 } 907} 908 909type badStringError struct { 910 what string 911 str string 912} 913 914func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) } 915 916// requires cc.mu be held. 917func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) []byte { 918 cc.hbuf.Reset() 919 920 host := req.Host 921 if host == "" { 922 host = req.URL.Host 923 } 924 925 // 8.1.2.3 Request Pseudo-Header Fields 926 // The :path pseudo-header field includes the path and query parts of the 927 // target URI (the path-absolute production and optionally a '?' character 928 // followed by the query production (see Sections 3.3 and 3.4 of 929 // [RFC3986]). 930 cc.writeHeader(":authority", host) 931 cc.writeHeader(":method", req.Method) 932 if req.Method != "CONNECT" { 933 cc.writeHeader(":path", req.URL.RequestURI()) 934 cc.writeHeader(":scheme", "https") 935 } 936 if trailers != "" { 937 cc.writeHeader("trailer", trailers) 938 } 939 940 var didUA bool 941 for k, vv := range req.Header { 942 lowKey := strings.ToLower(k) 943 switch lowKey { 944 case "host", "content-length": 945 // Host is :authority, already sent. 946 // Content-Length is automatic, set below. 947 continue 948 case "connection", "proxy-connection", "transfer-encoding", "upgrade": 949 // Per 8.1.2.2 Connection-Specific Header 950 // Fields, don't send connection-specific 951 // fields. We deal with these earlier in 952 // RoundTrip, deciding whether they're 953 // error-worthy, but we don't want to mutate 954 // the user's *Request so at this point, just 955 // skip over them at this point. 956 continue 957 case "user-agent": 958 // Match Go's http1 behavior: at most one 959 // User-Agent. If set to nil or empty string, 960 // then omit it. Otherwise if not mentioned, 961 // include the default (below). 962 didUA = true 963 if len(vv) < 1 { 964 continue 965 } 966 vv = vv[:1] 967 if vv[0] == "" { 968 continue 969 } 970 } 971 for _, v := range vv { 972 cc.writeHeader(lowKey, v) 973 } 974 } 975 if shouldSendReqContentLength(req.Method, contentLength) { 976 cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10)) 977 } 978 if addGzipHeader { 979 cc.writeHeader("accept-encoding", "gzip") 980 } 981 if !didUA { 982 cc.writeHeader("user-agent", defaultUserAgent) 983 } 984 return cc.hbuf.Bytes() 985} 986 987// shouldSendReqContentLength reports whether the http2.Transport should send 988// a "content-length" request header. This logic is basically a copy of the net/http 989// transferWriter.shouldSendContentLength. 990// The contentLength is the corrected contentLength (so 0 means actually 0, not unknown). 991// -1 means unknown. 992func shouldSendReqContentLength(method string, contentLength int64) bool { 993 if contentLength > 0 { 994 return true 995 } 996 if contentLength < 0 { 997 return false 998 } 999 // For zero bodies, whether we send a content-length depends on the method. 1000 // It also kinda doesn't matter for http2 either way, with END_STREAM. 1001 switch method { 1002 case "POST", "PUT", "PATCH": 1003 return true 1004 default: 1005 return false 1006 } 1007} 1008 1009// requires cc.mu be held. 1010func (cc *ClientConn) encodeTrailers(req *http.Request) []byte { 1011 cc.hbuf.Reset() 1012 for k, vv := range req.Trailer { 1013 // Transfer-Encoding, etc.. have already been filter at the 1014 // start of RoundTrip 1015 lowKey := strings.ToLower(k) 1016 for _, v := range vv { 1017 cc.writeHeader(lowKey, v) 1018 } 1019 } 1020 return cc.hbuf.Bytes() 1021} 1022 1023func (cc *ClientConn) writeHeader(name, value string) { 1024 if VerboseLogs { 1025 log.Printf("http2: Transport encoding header %q = %q", name, value) 1026 } 1027 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) 1028} 1029 1030type resAndError struct { 1031 res *http.Response 1032 err error 1033} 1034 1035// requires cc.mu be held. 1036func (cc *ClientConn) newStream() *clientStream { 1037 cs := &clientStream{ 1038 cc: cc, 1039 ID: cc.nextStreamID, 1040 resc: make(chan resAndError, 1), 1041 peerReset: make(chan struct{}), 1042 done: make(chan struct{}), 1043 } 1044 cs.flow.add(int32(cc.initialWindowSize)) 1045 cs.flow.setConnFlow(&cc.flow) 1046 cs.inflow.add(transportDefaultStreamFlow) 1047 cs.inflow.setConnFlow(&cc.inflow) 1048 cc.nextStreamID += 2 1049 cc.streams[cs.ID] = cs 1050 return cs 1051} 1052 1053func (cc *ClientConn) forgetStreamID(id uint32) { 1054 cc.streamByID(id, true) 1055} 1056 1057func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream { 1058 cc.mu.Lock() 1059 defer cc.mu.Unlock() 1060 cs := cc.streams[id] 1061 if andRemove && cs != nil && !cc.closed { 1062 delete(cc.streams, id) 1063 close(cs.done) 1064 } 1065 return cs 1066} 1067 1068// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. 1069type clientConnReadLoop struct { 1070 cc *ClientConn 1071 activeRes map[uint32]*clientStream // keyed by streamID 1072 closeWhenIdle bool 1073} 1074 1075// readLoop runs in its own goroutine and reads and dispatches frames. 1076func (cc *ClientConn) readLoop() { 1077 rl := &clientConnReadLoop{ 1078 cc: cc, 1079 activeRes: make(map[uint32]*clientStream), 1080 } 1081 1082 defer rl.cleanup() 1083 cc.readerErr = rl.run() 1084 if ce, ok := cc.readerErr.(ConnectionError); ok { 1085 cc.wmu.Lock() 1086 cc.fr.WriteGoAway(0, ErrCode(ce), nil) 1087 cc.wmu.Unlock() 1088 } 1089} 1090 1091func (rl *clientConnReadLoop) cleanup() { 1092 cc := rl.cc 1093 defer cc.tconn.Close() 1094 defer cc.t.connPool().MarkDead(cc) 1095 defer close(cc.readerDone) 1096 1097 // Close any response bodies if the server closes prematurely. 1098 // TODO: also do this if we've written the headers but not 1099 // gotten a response yet. 1100 err := cc.readerErr 1101 if err == io.EOF { 1102 err = io.ErrUnexpectedEOF 1103 } 1104 cc.mu.Lock() 1105 for _, cs := range rl.activeRes { 1106 cs.bufPipe.CloseWithError(err) 1107 } 1108 for _, cs := range cc.streams { 1109 select { 1110 case cs.resc <- resAndError{err: err}: 1111 default: 1112 } 1113 close(cs.done) 1114 } 1115 cc.closed = true 1116 cc.cond.Broadcast() 1117 cc.mu.Unlock() 1118} 1119 1120func (rl *clientConnReadLoop) run() error { 1121 cc := rl.cc 1122 rl.closeWhenIdle = cc.t.disableKeepAlives() 1123 gotReply := false // ever saw a reply 1124 for { 1125 f, err := cc.fr.ReadFrame() 1126 if err != nil { 1127 cc.vlogf("Transport readFrame error: (%T) %v", err, err) 1128 } 1129 if se, ok := err.(StreamError); ok { 1130 if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { 1131 rl.endStreamError(cs, cc.fr.errDetail) 1132 } 1133 continue 1134 } else if err != nil { 1135 return err 1136 } 1137 if VerboseLogs { 1138 cc.vlogf("http2: Transport received %s", summarizeFrame(f)) 1139 } 1140 maybeIdle := false // whether frame might transition us to idle 1141 1142 switch f := f.(type) { 1143 case *MetaHeadersFrame: 1144 err = rl.processHeaders(f) 1145 maybeIdle = true 1146 gotReply = true 1147 case *DataFrame: 1148 err = rl.processData(f) 1149 maybeIdle = true 1150 case *GoAwayFrame: 1151 err = rl.processGoAway(f) 1152 maybeIdle = true 1153 case *RSTStreamFrame: 1154 err = rl.processResetStream(f) 1155 maybeIdle = true 1156 case *SettingsFrame: 1157 err = rl.processSettings(f) 1158 case *PushPromiseFrame: 1159 err = rl.processPushPromise(f) 1160 case *WindowUpdateFrame: 1161 err = rl.processWindowUpdate(f) 1162 case *PingFrame: 1163 err = rl.processPing(f) 1164 default: 1165 cc.logf("Transport: unhandled response frame type %T", f) 1166 } 1167 if err != nil { 1168 return err 1169 } 1170 if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { 1171 cc.closeIfIdle() 1172 } 1173 } 1174} 1175 1176func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error { 1177 cc := rl.cc 1178 cs := cc.streamByID(f.StreamID, f.StreamEnded()) 1179 if cs == nil { 1180 // We'd get here if we canceled a request while the 1181 // server had its response still in flight. So if this 1182 // was just something we canceled, ignore it. 1183 return nil 1184 } 1185 if !cs.pastHeaders { 1186 cs.pastHeaders = true 1187 } else { 1188 return rl.processTrailers(cs, f) 1189 } 1190 1191 res, err := rl.handleResponse(cs, f) 1192 if err != nil { 1193 if _, ok := err.(ConnectionError); ok { 1194 return err 1195 } 1196 // Any other error type is a stream error. 1197 cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err) 1198 cs.resc <- resAndError{err: err} 1199 return nil // return nil from process* funcs to keep conn alive 1200 } 1201 if res == nil { 1202 // (nil, nil) special case. See handleResponse docs. 1203 return nil 1204 } 1205 if res.Body != noBody { 1206 rl.activeRes[cs.ID] = cs 1207 } 1208 cs.resTrailer = &res.Trailer 1209 cs.resc <- resAndError{res: res} 1210 return nil 1211} 1212 1213// may return error types nil, or ConnectionError. Any other error value 1214// is a StreamError of type ErrCodeProtocol. The returned error in that case 1215// is the detail. 1216// 1217// As a special case, handleResponse may return (nil, nil) to skip the 1218// frame (currently only used for 100 expect continue). This special 1219// case is going away after Issue 13851 is fixed. 1220func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) { 1221 if f.Truncated { 1222 return nil, errResponseHeaderListSize 1223 } 1224 1225 status := f.PseudoValue("status") 1226 if status == "" { 1227 return nil, errors.New("missing status pseudo header") 1228 } 1229 statusCode, err := strconv.Atoi(status) 1230 if err != nil { 1231 return nil, errors.New("malformed non-numeric status pseudo header") 1232 } 1233 1234 if statusCode == 100 { 1235 // Just skip 100-continue response headers for now. 1236 // TODO: golang.org/issue/13851 for doing it properly. 1237 cs.pastHeaders = false // do it all again 1238 return nil, nil 1239 } 1240 1241 header := make(http.Header) 1242 res := &http.Response{ 1243 Proto: "HTTP/2.0", 1244 ProtoMajor: 2, 1245 Header: header, 1246 StatusCode: statusCode, 1247 Status: status + " " + http.StatusText(statusCode), 1248 } 1249 for _, hf := range f.RegularFields() { 1250 key := http.CanonicalHeaderKey(hf.Name) 1251 if key == "Trailer" { 1252 t := res.Trailer 1253 if t == nil { 1254 t = make(http.Header) 1255 res.Trailer = t 1256 } 1257 foreachHeaderElement(hf.Value, func(v string) { 1258 t[http.CanonicalHeaderKey(v)] = nil 1259 }) 1260 } else { 1261 header[key] = append(header[key], hf.Value) 1262 } 1263 } 1264 1265 streamEnded := f.StreamEnded() 1266 if !streamEnded || cs.req.Method == "HEAD" { 1267 res.ContentLength = -1 1268 if clens := res.Header["Content-Length"]; len(clens) == 1 { 1269 if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { 1270 res.ContentLength = clen64 1271 } else { 1272 // TODO: care? unlike http/1, it won't mess up our framing, so it's 1273 // more safe smuggling-wise to ignore. 1274 } 1275 } else if len(clens) > 1 { 1276 // TODO: care? unlike http/1, it won't mess up our framing, so it's 1277 // more safe smuggling-wise to ignore. 1278 } 1279 } 1280 1281 if streamEnded { 1282 res.Body = noBody 1283 return res, nil 1284 } 1285 1286 buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage 1287 cs.bufPipe = pipe{b: buf} 1288 cs.bytesRemain = res.ContentLength 1289 res.Body = transportResponseBody{cs} 1290 go cs.awaitRequestCancel(requestCancel(cs.req)) 1291 1292 if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { 1293 res.Header.Del("Content-Encoding") 1294 res.Header.Del("Content-Length") 1295 res.ContentLength = -1 1296 res.Body = &gzipReader{body: res.Body} 1297 } 1298 return res, nil 1299} 1300 1301func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error { 1302 if cs.pastTrailers { 1303 // Too many HEADERS frames for this stream. 1304 return ConnectionError(ErrCodeProtocol) 1305 } 1306 cs.pastTrailers = true 1307 if !f.StreamEnded() { 1308 // We expect that any headers for trailers also 1309 // has END_STREAM. 1310 return ConnectionError(ErrCodeProtocol) 1311 } 1312 if len(f.PseudoFields()) > 0 { 1313 // No pseudo header fields are defined for trailers. 1314 // TODO: ConnectionError might be overly harsh? Check. 1315 return ConnectionError(ErrCodeProtocol) 1316 } 1317 1318 trailer := make(http.Header) 1319 for _, hf := range f.RegularFields() { 1320 key := http.CanonicalHeaderKey(hf.Name) 1321 trailer[key] = append(trailer[key], hf.Value) 1322 } 1323 cs.trailer = trailer 1324 1325 rl.endStream(cs) 1326 return nil 1327} 1328 1329// transportResponseBody is the concrete type of Transport.RoundTrip's 1330// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. 1331// On Close it sends RST_STREAM if EOF wasn't already seen. 1332type transportResponseBody struct { 1333 cs *clientStream 1334} 1335 1336func (b transportResponseBody) Read(p []byte) (n int, err error) { 1337 cs := b.cs 1338 cc := cs.cc 1339 1340 if cs.readErr != nil { 1341 return 0, cs.readErr 1342 } 1343 n, err = b.cs.bufPipe.Read(p) 1344 if cs.bytesRemain != -1 { 1345 if int64(n) > cs.bytesRemain { 1346 n = int(cs.bytesRemain) 1347 if err == nil { 1348 err = errors.New("net/http: server replied with more than declared Content-Length; truncated") 1349 cc.writeStreamReset(cs.ID, ErrCodeProtocol, err) 1350 } 1351 cs.readErr = err 1352 return int(cs.bytesRemain), err 1353 } 1354 cs.bytesRemain -= int64(n) 1355 if err == io.EOF && cs.bytesRemain > 0 { 1356 err = io.ErrUnexpectedEOF 1357 cs.readErr = err 1358 return n, err 1359 } 1360 } 1361 if n == 0 { 1362 // No flow control tokens to send back. 1363 return 1364 } 1365 1366 cc.mu.Lock() 1367 defer cc.mu.Unlock() 1368 1369 var connAdd, streamAdd int32 1370 // Check the conn-level first, before the stream-level. 1371 if v := cc.inflow.available(); v < transportDefaultConnFlow/2 { 1372 connAdd = transportDefaultConnFlow - v 1373 cc.inflow.add(connAdd) 1374 } 1375 if err == nil { // No need to refresh if the stream is over or failed. 1376 if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { 1377 streamAdd = transportDefaultStreamFlow - v 1378 cs.inflow.add(streamAdd) 1379 } 1380 } 1381 if connAdd != 0 || streamAdd != 0 { 1382 cc.wmu.Lock() 1383 defer cc.wmu.Unlock() 1384 if connAdd != 0 { 1385 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd)) 1386 } 1387 if streamAdd != 0 { 1388 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd)) 1389 } 1390 cc.bw.Flush() 1391 } 1392 return 1393} 1394 1395var errClosedResponseBody = errors.New("http2: response body closed") 1396 1397func (b transportResponseBody) Close() error { 1398 cs := b.cs 1399 if cs.bufPipe.Err() != io.EOF { 1400 // TODO: write test for this 1401 cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) 1402 } 1403 cs.bufPipe.BreakWithError(errClosedResponseBody) 1404 return nil 1405} 1406 1407func (rl *clientConnReadLoop) processData(f *DataFrame) error { 1408 cc := rl.cc 1409 cs := cc.streamByID(f.StreamID, f.StreamEnded()) 1410 if cs == nil { 1411 cc.mu.Lock() 1412 neverSent := cc.nextStreamID 1413 cc.mu.Unlock() 1414 if f.StreamID >= neverSent { 1415 // We never asked for this. 1416 cc.logf("http2: Transport received unsolicited DATA frame; closing connection") 1417 return ConnectionError(ErrCodeProtocol) 1418 } 1419 // We probably did ask for this, but canceled. Just ignore it. 1420 // TODO: be stricter here? only silently ignore things which 1421 // we canceled, but not things which were closed normally 1422 // by the peer? Tough without accumulating too much state. 1423 return nil 1424 } 1425 if data := f.Data(); len(data) > 0 { 1426 if cs.bufPipe.b == nil { 1427 // Data frame after it's already closed? 1428 cc.logf("http2: Transport received DATA frame for closed stream; closing connection") 1429 return ConnectionError(ErrCodeProtocol) 1430 } 1431 1432 // Check connection-level flow control. 1433 cc.mu.Lock() 1434 if cs.inflow.available() >= int32(len(data)) { 1435 cs.inflow.take(int32(len(data))) 1436 } else { 1437 cc.mu.Unlock() 1438 return ConnectionError(ErrCodeFlowControl) 1439 } 1440 cc.mu.Unlock() 1441 1442 if _, err := cs.bufPipe.Write(data); err != nil { 1443 rl.endStreamError(cs, err) 1444 return err 1445 } 1446 } 1447 1448 if f.StreamEnded() { 1449 rl.endStream(cs) 1450 } 1451 return nil 1452} 1453 1454var errInvalidTrailers = errors.New("http2: invalid trailers") 1455 1456func (rl *clientConnReadLoop) endStream(cs *clientStream) { 1457 // TODO: check that any declared content-length matches, like 1458 // server.go's (*stream).endStream method. 1459 rl.endStreamError(cs, nil) 1460} 1461 1462func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) { 1463 var code func() 1464 if err == nil { 1465 err = io.EOF 1466 code = cs.copyTrailers 1467 } 1468 cs.bufPipe.closeWithErrorAndCode(err, code) 1469 delete(rl.activeRes, cs.ID) 1470 if cs.req.Close || cs.req.Header.Get("Connection") == "close" { 1471 rl.closeWhenIdle = true 1472 } 1473} 1474 1475func (cs *clientStream) copyTrailers() { 1476 for k, vv := range cs.trailer { 1477 t := cs.resTrailer 1478 if *t == nil { 1479 *t = make(http.Header) 1480 } 1481 (*t)[k] = vv 1482 } 1483} 1484 1485func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error { 1486 cc := rl.cc 1487 cc.t.connPool().MarkDead(cc) 1488 if f.ErrCode != 0 { 1489 // TODO: deal with GOAWAY more. particularly the error code 1490 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) 1491 } 1492 cc.setGoAway(f) 1493 return nil 1494} 1495 1496func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { 1497 cc := rl.cc 1498 cc.mu.Lock() 1499 defer cc.mu.Unlock() 1500 return f.ForeachSetting(func(s Setting) error { 1501 switch s.ID { 1502 case SettingMaxFrameSize: 1503 cc.maxFrameSize = s.Val 1504 case SettingMaxConcurrentStreams: 1505 cc.maxConcurrentStreams = s.Val 1506 case SettingInitialWindowSize: 1507 // TODO: error if this is too large. 1508 1509 // TODO: adjust flow control of still-open 1510 // frames by the difference of the old initial 1511 // window size and this one. 1512 cc.initialWindowSize = s.Val 1513 default: 1514 // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. 1515 cc.vlogf("Unhandled Setting: %v", s) 1516 } 1517 return nil 1518 }) 1519} 1520 1521func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { 1522 cc := rl.cc 1523 cs := cc.streamByID(f.StreamID, false) 1524 if f.StreamID != 0 && cs == nil { 1525 return nil 1526 } 1527 1528 cc.mu.Lock() 1529 defer cc.mu.Unlock() 1530 1531 fl := &cc.flow 1532 if cs != nil { 1533 fl = &cs.flow 1534 } 1535 if !fl.add(int32(f.Increment)) { 1536 return ConnectionError(ErrCodeFlowControl) 1537 } 1538 cc.cond.Broadcast() 1539 return nil 1540} 1541 1542func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error { 1543 cs := rl.cc.streamByID(f.StreamID, true) 1544 if cs == nil { 1545 // TODO: return error if server tries to RST_STEAM an idle stream 1546 return nil 1547 } 1548 select { 1549 case <-cs.peerReset: 1550 // Already reset. 1551 // This is the only goroutine 1552 // which closes this, so there 1553 // isn't a race. 1554 default: 1555 err := StreamError{cs.ID, f.ErrCode} 1556 cs.resetErr = err 1557 close(cs.peerReset) 1558 cs.bufPipe.CloseWithError(err) 1559 cs.cc.cond.Broadcast() // wake up checkReset via clientStream.awaitFlowControl 1560 } 1561 delete(rl.activeRes, cs.ID) 1562 return nil 1563} 1564 1565func (rl *clientConnReadLoop) processPing(f *PingFrame) error { 1566 if f.IsAck() { 1567 // 6.7 PING: " An endpoint MUST NOT respond to PING frames 1568 // containing this flag." 1569 return nil 1570 } 1571 cc := rl.cc 1572 cc.wmu.Lock() 1573 defer cc.wmu.Unlock() 1574 if err := cc.fr.WritePing(true, f.Data); err != nil { 1575 return err 1576 } 1577 return cc.bw.Flush() 1578} 1579 1580func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { 1581 // We told the peer we don't want them. 1582 // Spec says: 1583 // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH 1584 // setting of the peer endpoint is set to 0. An endpoint that 1585 // has set this setting and has received acknowledgement MUST 1586 // treat the receipt of a PUSH_PROMISE frame as a connection 1587 // error (Section 5.4.1) of type PROTOCOL_ERROR." 1588 return ConnectionError(ErrCodeProtocol) 1589} 1590 1591func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { 1592 // TODO: do something with err? send it as a debug frame to the peer? 1593 // But that's only in GOAWAY. Invent a new frame type? Is there one already? 1594 cc.wmu.Lock() 1595 cc.fr.WriteRSTStream(streamID, code) 1596 cc.bw.Flush() 1597 cc.wmu.Unlock() 1598} 1599 1600var ( 1601 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") 1602 errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") 1603) 1604 1605func (cc *ClientConn) logf(format string, args ...interface{}) { 1606 cc.t.logf(format, args...) 1607} 1608 1609func (cc *ClientConn) vlogf(format string, args ...interface{}) { 1610 cc.t.vlogf(format, args...) 1611} 1612 1613func (t *Transport) vlogf(format string, args ...interface{}) { 1614 if VerboseLogs { 1615 t.logf(format, args...) 1616 } 1617} 1618 1619func (t *Transport) logf(format string, args ...interface{}) { 1620 log.Printf(format, args...) 1621} 1622 1623var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) 1624 1625func strSliceContains(ss []string, s string) bool { 1626 for _, v := range ss { 1627 if v == s { 1628 return true 1629 } 1630 } 1631 return false 1632} 1633 1634type erringRoundTripper struct{ err error } 1635 1636func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err } 1637 1638// gzipReader wraps a response body so it can lazily 1639// call gzip.NewReader on the first call to Read 1640type gzipReader struct { 1641 body io.ReadCloser // underlying Response.Body 1642 zr *gzip.Reader // lazily-initialized gzip reader 1643 zerr error // sticky error 1644} 1645 1646func (gz *gzipReader) Read(p []byte) (n int, err error) { 1647 if gz.zerr != nil { 1648 return 0, gz.zerr 1649 } 1650 if gz.zr == nil { 1651 gz.zr, err = gzip.NewReader(gz.body) 1652 if err != nil { 1653 gz.zerr = err 1654 return 0, err 1655 } 1656 } 1657 return gz.zr.Read(p) 1658} 1659 1660func (gz *gzipReader) Close() error { 1661 return gz.body.Close() 1662} 1663 1664type errorReader struct{ err error } 1665 1666func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } 1667