1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "io" 23 "math" 24 "net" 25 "strings" 26 "sync" 27 "sync/atomic" 28 "time" 29 30 "golang.org/x/net/context" 31 "golang.org/x/net/http2" 32 "golang.org/x/net/http2/hpack" 33 34 "google.golang.org/grpc/channelz" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/keepalive" 38 "google.golang.org/grpc/metadata" 39 "google.golang.org/grpc/peer" 40 "google.golang.org/grpc/stats" 41 "google.golang.org/grpc/status" 42) 43 44// http2Client implements the ClientTransport interface with HTTP2. 45type http2Client struct { 46 ctx context.Context 47 cancel context.CancelFunc 48 ctxDone <-chan struct{} // Cache the ctx.Done() chan. 49 userAgent string 50 md interface{} 51 conn net.Conn // underlying communication channel 52 loopy *loopyWriter 53 remoteAddr net.Addr 54 localAddr net.Addr 55 authInfo credentials.AuthInfo // auth info about the connection 56 57 readerDone chan struct{} // sync point to enable testing. 58 writerDone chan struct{} // sync point to enable testing. 59 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) 60 // that the server sent GoAway on this transport. 61 goAway chan struct{} 62 // awakenKeepalive is used to wake up keepalive when after it has gone dormant. 63 awakenKeepalive chan struct{} 64 65 framer *framer 66 // controlBuf delivers all the control related tasks (e.g., window 67 // updates, reset streams, and various settings) to the controller. 68 controlBuf *controlBuffer 69 fc *trInFlow 70 // The scheme used: https if TLS is on, http otherwise. 71 scheme string 72 73 isSecure bool 74 75 creds []credentials.PerRPCCredentials 76 77 // Boolean to keep track of reading activity on transport. 78 // 1 is true and 0 is false. 79 activity uint32 // Accessed atomically. 80 kp keepalive.ClientParameters 81 82 statsHandler stats.Handler 83 84 initialWindowSize int32 85 86 bdpEst *bdpEstimator 87 // onSuccess is a callback that client transport calls upon 88 // receiving server preface to signal that a succefull HTTP2 89 // connection was established. 90 onSuccess func() 91 92 maxConcurrentStreams uint32 93 streamQuota int64 94 streamsQuotaAvailable chan struct{} 95 waitingStreams uint32 96 nextID uint32 97 98 mu sync.Mutex // guard the following variables 99 state transportState 100 activeStreams map[uint32]*Stream 101 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. 102 prevGoAwayID uint32 103 // goAwayReason records the http2.ErrCode and debug data received with the 104 // GoAway frame. 105 goAwayReason GoAwayReason 106 107 // Fields below are for channelz metric collection. 108 channelzID int64 // channelz unique identification number 109 czmu sync.RWMutex 110 kpCount int64 111 // The number of streams that have started, including already finished ones. 112 streamsStarted int64 113 // The number of streams that have ended successfully by receiving EoS bit set 114 // frame from server. 115 streamsSucceeded int64 116 streamsFailed int64 117 lastStreamCreated time.Time 118 msgSent int64 119 msgRecv int64 120 lastMsgSent time.Time 121 lastMsgRecv time.Time 122} 123 124func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { 125 if fn != nil { 126 return fn(ctx, addr) 127 } 128 return dialContext(ctx, "tcp", addr) 129} 130 131func isTemporary(err error) bool { 132 switch err := err.(type) { 133 case interface { 134 Temporary() bool 135 }: 136 return err.Temporary() 137 case interface { 138 Timeout() bool 139 }: 140 // Timeouts may be resolved upon retry, and are thus treated as 141 // temporary. 142 return err.Timeout() 143 } 144 return true 145} 146 147// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 148// and starts to receive messages on it. Non-nil error returns if construction 149// fails. 150func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { 151 scheme := "http" 152 ctx, cancel := context.WithCancel(ctx) 153 defer func() { 154 if err != nil { 155 cancel() 156 } 157 }() 158 159 conn, err := dial(connectCtx, opts.Dialer, addr.Addr) 160 if err != nil { 161 if opts.FailOnNonTempDialError { 162 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) 163 } 164 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) 165 } 166 // Any further errors will close the underlying connection 167 defer func(conn net.Conn) { 168 if err != nil { 169 conn.Close() 170 } 171 }(conn) 172 var ( 173 isSecure bool 174 authInfo credentials.AuthInfo 175 ) 176 if creds := opts.TransportCredentials; creds != nil { 177 scheme = "https" 178 conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn) 179 if err != nil { 180 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) 181 } 182 isSecure = true 183 } 184 kp := opts.KeepaliveParams 185 // Validate keepalive parameters. 186 if kp.Time == 0 { 187 kp.Time = defaultClientKeepaliveTime 188 } 189 if kp.Timeout == 0 { 190 kp.Timeout = defaultClientKeepaliveTimeout 191 } 192 dynamicWindow := true 193 icwz := int32(initialWindowSize) 194 if opts.InitialConnWindowSize >= defaultWindowSize { 195 icwz = opts.InitialConnWindowSize 196 dynamicWindow = false 197 } 198 writeBufSize := defaultWriteBufSize 199 if opts.WriteBufferSize > 0 { 200 writeBufSize = opts.WriteBufferSize 201 } 202 readBufSize := defaultReadBufSize 203 if opts.ReadBufferSize > 0 { 204 readBufSize = opts.ReadBufferSize 205 } 206 t := &http2Client{ 207 ctx: ctx, 208 ctxDone: ctx.Done(), // Cache Done chan. 209 cancel: cancel, 210 userAgent: opts.UserAgent, 211 md: addr.Metadata, 212 conn: conn, 213 remoteAddr: conn.RemoteAddr(), 214 localAddr: conn.LocalAddr(), 215 authInfo: authInfo, 216 readerDone: make(chan struct{}), 217 writerDone: make(chan struct{}), 218 goAway: make(chan struct{}), 219 awakenKeepalive: make(chan struct{}, 1), 220 framer: newFramer(conn, writeBufSize, readBufSize), 221 fc: &trInFlow{limit: uint32(icwz)}, 222 scheme: scheme, 223 activeStreams: make(map[uint32]*Stream), 224 isSecure: isSecure, 225 creds: opts.PerRPCCredentials, 226 kp: kp, 227 statsHandler: opts.StatsHandler, 228 initialWindowSize: initialWindowSize, 229 onSuccess: onSuccess, 230 nextID: 1, 231 maxConcurrentStreams: defaultMaxStreamsClient, 232 streamQuota: defaultMaxStreamsClient, 233 streamsQuotaAvailable: make(chan struct{}, 1), 234 } 235 t.controlBuf = newControlBuffer(t.ctxDone) 236 if opts.InitialWindowSize >= defaultWindowSize { 237 t.initialWindowSize = opts.InitialWindowSize 238 dynamicWindow = false 239 } 240 if dynamicWindow { 241 t.bdpEst = &bdpEstimator{ 242 bdp: initialWindowSize, 243 updateFlowControl: t.updateFlowControl, 244 } 245 } 246 // Make sure awakenKeepalive can't be written upon. 247 // keepalive routine will make it writable, if need be. 248 t.awakenKeepalive <- struct{}{} 249 if t.statsHandler != nil { 250 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ 251 RemoteAddr: t.remoteAddr, 252 LocalAddr: t.localAddr, 253 }) 254 connBegin := &stats.ConnBegin{ 255 Client: true, 256 } 257 t.statsHandler.HandleConn(t.ctx, connBegin) 258 } 259 if channelz.IsOn() { 260 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "") 261 } 262 // Start the reader goroutine for incoming message. Each transport has 263 // a dedicated goroutine which reads HTTP2 frame from network. Then it 264 // dispatches the frame to the corresponding stream entity. 265 go t.reader() 266 // Send connection preface to server. 267 n, err := t.conn.Write(clientPreface) 268 if err != nil { 269 t.Close() 270 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) 271 } 272 if n != len(clientPreface) { 273 t.Close() 274 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) 275 } 276 if t.initialWindowSize != defaultWindowSize { 277 err = t.framer.fr.WriteSettings(http2.Setting{ 278 ID: http2.SettingInitialWindowSize, 279 Val: uint32(t.initialWindowSize), 280 }) 281 } else { 282 err = t.framer.fr.WriteSettings() 283 } 284 if err != nil { 285 t.Close() 286 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) 287 } 288 // Adjust the connection flow control window if needed. 289 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 290 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { 291 t.Close() 292 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) 293 } 294 } 295 t.framer.writer.Flush() 296 go func() { 297 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) 298 t.loopy.run() 299 t.conn.Close() 300 close(t.writerDone) 301 }() 302 if t.kp.Time != infinity { 303 go t.keepalive() 304 } 305 return t, nil 306} 307 308func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { 309 // TODO(zhaoq): Handle uint32 overflow of Stream.id. 310 s := &Stream{ 311 done: make(chan struct{}), 312 method: callHdr.Method, 313 sendCompress: callHdr.SendCompress, 314 buf: newRecvBuffer(), 315 headerChan: make(chan struct{}), 316 contentSubtype: callHdr.ContentSubtype, 317 } 318 s.wq = newWriteQuota(defaultWriteQuota, s.done) 319 s.requestRead = func(n int) { 320 t.adjustWindow(s, uint32(n)) 321 } 322 // The client side stream context should have exactly the same life cycle with the user provided context. 323 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. 324 // So we use the original context here instead of creating a copy. 325 s.ctx = ctx 326 s.trReader = &transportReader{ 327 reader: &recvBufferReader{ 328 ctx: s.ctx, 329 ctxDone: s.ctx.Done(), 330 recv: s.buf, 331 }, 332 windowHandler: func(n int) { 333 t.updateWindow(s, uint32(n)) 334 }, 335 } 336 return s 337} 338 339func (t *http2Client) getPeer() *peer.Peer { 340 pr := &peer.Peer{ 341 Addr: t.remoteAddr, 342 } 343 // Attach Auth info if there is any. 344 if t.authInfo != nil { 345 pr.AuthInfo = t.authInfo 346 } 347 return pr 348} 349 350func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { 351 aud := t.createAudience(callHdr) 352 authData, err := t.getTrAuthData(ctx, aud) 353 if err != nil { 354 return nil, err 355 } 356 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr) 357 if err != nil { 358 return nil, err 359 } 360 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 361 // first and create a slice of that exact size. 362 // Make the slice of certain predictable size to reduce allocations made by append. 363 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te 364 hfLen += len(authData) + len(callAuthData) 365 headerFields := make([]hpack.HeaderField, 0, hfLen) 366 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) 367 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) 368 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) 369 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) 370 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) 371 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) 372 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) 373 374 if callHdr.SendCompress != "" { 375 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) 376 } 377 if dl, ok := ctx.Deadline(); ok { 378 // Send out timeout regardless its value. The server can detect timeout context by itself. 379 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. 380 timeout := dl.Sub(time.Now()) 381 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) 382 } 383 for k, v := range authData { 384 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 385 } 386 for k, v := range callAuthData { 387 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 388 } 389 if b := stats.OutgoingTags(ctx); b != nil { 390 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) 391 } 392 if b := stats.OutgoingTrace(ctx); b != nil { 393 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) 394 } 395 396 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { 397 var k string 398 for _, vv := range added { 399 for i, v := range vv { 400 if i%2 == 0 { 401 k = v 402 continue 403 } 404 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 405 if isReservedHeader(k) { 406 continue 407 } 408 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) 409 } 410 } 411 for k, vv := range md { 412 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 413 if isReservedHeader(k) { 414 continue 415 } 416 for _, v := range vv { 417 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 418 } 419 } 420 } 421 if md, ok := t.md.(*metadata.MD); ok { 422 for k, vv := range *md { 423 if isReservedHeader(k) { 424 continue 425 } 426 for _, v := range vv { 427 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 428 } 429 } 430 } 431 return headerFields, nil 432} 433 434func (t *http2Client) createAudience(callHdr *CallHdr) string { 435 // Create an audience string only if needed. 436 if len(t.creds) == 0 && callHdr.Creds == nil { 437 return "" 438 } 439 // Construct URI required to get auth request metadata. 440 // Omit port if it is the default one. 441 host := strings.TrimSuffix(callHdr.Host, ":443") 442 pos := strings.LastIndex(callHdr.Method, "/") 443 if pos == -1 { 444 pos = len(callHdr.Method) 445 } 446 return "https://" + host + callHdr.Method[:pos] 447} 448 449func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { 450 authData := map[string]string{} 451 for _, c := range t.creds { 452 data, err := c.GetRequestMetadata(ctx, audience) 453 if err != nil { 454 if _, ok := status.FromError(err); ok { 455 return nil, err 456 } 457 458 return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err) 459 } 460 for k, v := range data { 461 // Capital header names are illegal in HTTP/2. 462 k = strings.ToLower(k) 463 authData[k] = v 464 } 465 } 466 return authData, nil 467} 468 469func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { 470 callAuthData := map[string]string{} 471 // Check if credentials.PerRPCCredentials were provided via call options. 472 // Note: if these credentials are provided both via dial options and call 473 // options, then both sets of credentials will be applied. 474 if callCreds := callHdr.Creds; callCreds != nil { 475 if !t.isSecure && callCreds.RequireTransportSecurity() { 476 return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") 477 } 478 data, err := callCreds.GetRequestMetadata(ctx, audience) 479 if err != nil { 480 return nil, streamErrorf(codes.Internal, "transport: %v", err) 481 } 482 for k, v := range data { 483 // Capital header names are illegal in HTTP/2 484 k = strings.ToLower(k) 485 callAuthData[k] = v 486 } 487 } 488 return callAuthData, nil 489} 490 491// NewStream creates a stream and registers it into the transport as "active" 492// streams. 493func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { 494 ctx = peer.NewContext(ctx, t.getPeer()) 495 headerFields, err := t.createHeaderFields(ctx, callHdr) 496 if err != nil { 497 return nil, err 498 } 499 s := t.newStream(ctx, callHdr) 500 cleanup := func(err error) { 501 if s.swapState(streamDone) == streamDone { 502 // If it was already done, return. 503 return 504 } 505 // The stream was unprocessed by the server. 506 atomic.StoreUint32(&s.unprocessed, 1) 507 s.write(recvMsg{err: err}) 508 close(s.done) 509 // If headerChan isn't closed, then close it. 510 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 511 close(s.headerChan) 512 } 513 514 } 515 hdr := &headerFrame{ 516 hf: headerFields, 517 endStream: false, 518 initStream: func(id uint32) (bool, error) { 519 t.mu.Lock() 520 if state := t.state; state != reachable { 521 t.mu.Unlock() 522 // Do a quick cleanup. 523 err := error(errStreamDrain) 524 if state == closing { 525 err = ErrConnClosing 526 } 527 cleanup(err) 528 return false, err 529 } 530 t.activeStreams[id] = s 531 if channelz.IsOn() { 532 t.czmu.Lock() 533 t.streamsStarted++ 534 t.lastStreamCreated = time.Now() 535 t.czmu.Unlock() 536 } 537 var sendPing bool 538 // If the number of active streams change from 0 to 1, then check if keepalive 539 // has gone dormant. If so, wake it up. 540 if len(t.activeStreams) == 1 { 541 select { 542 case t.awakenKeepalive <- struct{}{}: 543 sendPing = true 544 // Fill the awakenKeepalive channel again as this channel must be 545 // kept non-writable except at the point that the keepalive() 546 // goroutine is waiting either to be awaken or shutdown. 547 t.awakenKeepalive <- struct{}{} 548 default: 549 } 550 } 551 t.mu.Unlock() 552 return sendPing, nil 553 }, 554 onOrphaned: cleanup, 555 wq: s.wq, 556 } 557 firstTry := true 558 var ch chan struct{} 559 checkForStreamQuota := func(it interface{}) bool { 560 if t.streamQuota <= 0 { // Can go negative if server decreases it. 561 if firstTry { 562 t.waitingStreams++ 563 } 564 ch = t.streamsQuotaAvailable 565 return false 566 } 567 if !firstTry { 568 t.waitingStreams-- 569 } 570 t.streamQuota-- 571 h := it.(*headerFrame) 572 h.streamID = t.nextID 573 t.nextID += 2 574 s.id = h.streamID 575 s.fc = &inFlow{limit: uint32(t.initialWindowSize)} 576 if t.streamQuota > 0 && t.waitingStreams > 0 { 577 select { 578 case t.streamsQuotaAvailable <- struct{}{}: 579 default: 580 } 581 } 582 return true 583 } 584 for { 585 success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr) 586 if err != nil { 587 return nil, err 588 } 589 if success { 590 break 591 } 592 firstTry = false 593 select { 594 case <-ch: 595 case <-s.ctx.Done(): 596 return nil, ContextErr(s.ctx.Err()) 597 case <-t.goAway: 598 return nil, errStreamDrain 599 case <-t.ctx.Done(): 600 return nil, ErrConnClosing 601 } 602 } 603 if t.statsHandler != nil { 604 outHeader := &stats.OutHeader{ 605 Client: true, 606 FullMethod: callHdr.Method, 607 RemoteAddr: t.remoteAddr, 608 LocalAddr: t.localAddr, 609 Compression: callHdr.SendCompress, 610 } 611 t.statsHandler.HandleRPC(s.ctx, outHeader) 612 } 613 return s, nil 614} 615 616// CloseStream clears the footprint of a stream when the stream is not needed any more. 617// This must not be executed in reader's goroutine. 618func (t *http2Client) CloseStream(s *Stream, err error) { 619 var ( 620 rst bool 621 rstCode http2.ErrCode 622 ) 623 if err != nil { 624 rst = true 625 rstCode = http2.ErrCodeCancel 626 } 627 t.closeStream(s, err, rst, rstCode, nil, nil, false) 628} 629 630func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { 631 // Set stream status to done. 632 if s.swapState(streamDone) == streamDone { 633 // If it was already done, return. 634 return 635 } 636 // status and trailers can be updated here without any synchronization because the stream goroutine will 637 // only read it after it sees an io.EOF error from read or write and we'll write those errors 638 // only after updating this. 639 s.status = st 640 if len(mdata) > 0 { 641 s.trailer = mdata 642 } 643 if err != nil { 644 // This will unblock reads eventually. 645 s.write(recvMsg{err: err}) 646 } 647 // This will unblock write. 648 close(s.done) 649 // If headerChan isn't closed, then close it. 650 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 651 close(s.headerChan) 652 } 653 cleanup := &cleanupStream{ 654 streamID: s.id, 655 onWrite: func() { 656 t.mu.Lock() 657 if t.activeStreams != nil { 658 delete(t.activeStreams, s.id) 659 } 660 t.mu.Unlock() 661 if channelz.IsOn() { 662 t.czmu.Lock() 663 if eosReceived { 664 t.streamsSucceeded++ 665 } else { 666 t.streamsFailed++ 667 } 668 t.czmu.Unlock() 669 } 670 }, 671 rst: rst, 672 rstCode: rstCode, 673 } 674 addBackStreamQuota := func(interface{}) bool { 675 t.streamQuota++ 676 if t.streamQuota > 0 && t.waitingStreams > 0 { 677 select { 678 case t.streamsQuotaAvailable <- struct{}{}: 679 default: 680 } 681 } 682 return true 683 } 684 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) 685} 686 687// Close kicks off the shutdown process of the transport. This should be called 688// only once on a transport. Once it is called, the transport should not be 689// accessed any more. 690func (t *http2Client) Close() error { 691 t.mu.Lock() 692 // Make sure we only Close once. 693 if t.state == closing { 694 t.mu.Unlock() 695 return nil 696 } 697 t.state = closing 698 streams := t.activeStreams 699 t.activeStreams = nil 700 t.mu.Unlock() 701 t.controlBuf.finish() 702 t.cancel() 703 err := t.conn.Close() 704 if channelz.IsOn() { 705 channelz.RemoveEntry(t.channelzID) 706 } 707 // Notify all active streams. 708 for _, s := range streams { 709 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false) 710 } 711 if t.statsHandler != nil { 712 connEnd := &stats.ConnEnd{ 713 Client: true, 714 } 715 t.statsHandler.HandleConn(t.ctx, connEnd) 716 } 717 return err 718} 719 720// GracefulClose sets the state to draining, which prevents new streams from 721// being created and causes the transport to be closed when the last active 722// stream is closed. If there are no active streams, the transport is closed 723// immediately. This does nothing if the transport is already draining or 724// closing. 725func (t *http2Client) GracefulClose() error { 726 t.mu.Lock() 727 // Make sure we move to draining only from active. 728 if t.state == draining || t.state == closing { 729 t.mu.Unlock() 730 return nil 731 } 732 t.state = draining 733 active := len(t.activeStreams) 734 t.mu.Unlock() 735 if active == 0 { 736 return t.Close() 737 } 738 return nil 739} 740 741// Write formats the data into HTTP2 data frame(s) and sends it out. The caller 742// should proceed only if Write returns nil. 743func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 744 if opts.Last { 745 // If it's the last message, update stream state. 746 if !s.compareAndSwapState(streamActive, streamWriteDone) { 747 return errStreamDone 748 } 749 } else if s.getState() != streamActive { 750 return errStreamDone 751 } 752 df := &dataFrame{ 753 streamID: s.id, 754 endStream: opts.Last, 755 } 756 if hdr != nil || data != nil { // If it's not an empty data frame. 757 // Add some data to grpc message header so that we can equally 758 // distribute bytes across frames. 759 emptyLen := http2MaxFrameLen - len(hdr) 760 if emptyLen > len(data) { 761 emptyLen = len(data) 762 } 763 hdr = append(hdr, data[:emptyLen]...) 764 data = data[emptyLen:] 765 df.h, df.d = hdr, data 766 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. 767 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 768 return err 769 } 770 } 771 return t.controlBuf.put(df) 772} 773 774func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { 775 t.mu.Lock() 776 defer t.mu.Unlock() 777 s, ok := t.activeStreams[f.Header().StreamID] 778 return s, ok 779} 780 781// adjustWindow sends out extra window update over the initial window size 782// of stream if the application is requesting data larger in size than 783// the window. 784func (t *http2Client) adjustWindow(s *Stream, n uint32) { 785 if w := s.fc.maybeAdjust(n); w > 0 { 786 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 787 } 788} 789 790// updateWindow adjusts the inbound quota for the stream. 791// Window updates will be sent out when the cumulative quota 792// exceeds the corresponding threshold. 793func (t *http2Client) updateWindow(s *Stream, n uint32) { 794 if w := s.fc.onRead(n); w > 0 { 795 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 796 } 797} 798 799// updateFlowControl updates the incoming flow control windows 800// for the transport and the stream based on the current bdp 801// estimation. 802func (t *http2Client) updateFlowControl(n uint32) { 803 t.mu.Lock() 804 for _, s := range t.activeStreams { 805 s.fc.newLimit(n) 806 } 807 t.mu.Unlock() 808 updateIWS := func(interface{}) bool { 809 t.initialWindowSize = int32(n) 810 return true 811 } 812 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) 813 t.controlBuf.put(&outgoingSettings{ 814 ss: []http2.Setting{ 815 { 816 ID: http2.SettingInitialWindowSize, 817 Val: n, 818 }, 819 }, 820 }) 821} 822 823func (t *http2Client) handleData(f *http2.DataFrame) { 824 size := f.Header().Length 825 var sendBDPPing bool 826 if t.bdpEst != nil { 827 sendBDPPing = t.bdpEst.add(size) 828 } 829 // Decouple connection's flow control from application's read. 830 // An update on connection's flow control should not depend on 831 // whether user application has read the data or not. Such a 832 // restriction is already imposed on the stream's flow control, 833 // and therefore the sender will be blocked anyways. 834 // Decoupling the connection flow control will prevent other 835 // active(fast) streams from starving in presence of slow or 836 // inactive streams. 837 // 838 if w := t.fc.onData(size); w > 0 { 839 t.controlBuf.put(&outgoingWindowUpdate{ 840 streamID: 0, 841 increment: w, 842 }) 843 } 844 if sendBDPPing { 845 // Avoid excessive ping detection (e.g. in an L7 proxy) 846 // by sending a window update prior to the BDP ping. 847 848 if w := t.fc.reset(); w > 0 { 849 t.controlBuf.put(&outgoingWindowUpdate{ 850 streamID: 0, 851 increment: w, 852 }) 853 } 854 855 t.controlBuf.put(bdpPing) 856 } 857 // Select the right stream to dispatch. 858 s, ok := t.getStream(f) 859 if !ok { 860 return 861 } 862 if size > 0 { 863 if err := s.fc.onData(size); err != nil { 864 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) 865 return 866 } 867 if f.Header().Flags.Has(http2.FlagDataPadded) { 868 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 869 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 870 } 871 } 872 // TODO(bradfitz, zhaoq): A copy is required here because there is no 873 // guarantee f.Data() is consumed before the arrival of next frame. 874 // Can this copy be eliminated? 875 if len(f.Data()) > 0 { 876 data := make([]byte, len(f.Data())) 877 copy(data, f.Data()) 878 s.write(recvMsg{data: data}) 879 } 880 } 881 // The server has closed the stream without sending trailers. Record that 882 // the read direction is closed, and set the status appropriately. 883 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { 884 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) 885 } 886} 887 888func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { 889 s, ok := t.getStream(f) 890 if !ok { 891 return 892 } 893 if f.ErrCode == http2.ErrCodeRefusedStream { 894 // The stream was unprocessed by the server. 895 atomic.StoreUint32(&s.unprocessed, 1) 896 } 897 statusCode, ok := http2ErrConvTab[f.ErrCode] 898 if !ok { 899 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) 900 statusCode = codes.Unknown 901 } 902 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) 903} 904 905func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { 906 if f.IsAck() { 907 return 908 } 909 var maxStreams *uint32 910 var ss []http2.Setting 911 f.ForeachSetting(func(s http2.Setting) error { 912 if s.ID == http2.SettingMaxConcurrentStreams { 913 maxStreams = new(uint32) 914 *maxStreams = s.Val 915 return nil 916 } 917 ss = append(ss, s) 918 return nil 919 }) 920 if isFirst && maxStreams == nil { 921 maxStreams = new(uint32) 922 *maxStreams = math.MaxUint32 923 } 924 sf := &incomingSettings{ 925 ss: ss, 926 } 927 if maxStreams == nil { 928 t.controlBuf.put(sf) 929 return 930 } 931 updateStreamQuota := func(interface{}) bool { 932 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams) 933 t.maxConcurrentStreams = *maxStreams 934 t.streamQuota += delta 935 if delta > 0 && t.waitingStreams > 0 { 936 close(t.streamsQuotaAvailable) // wake all of them up. 937 t.streamsQuotaAvailable = make(chan struct{}, 1) 938 } 939 return true 940 } 941 t.controlBuf.executeAndPut(updateStreamQuota, sf) 942} 943 944func (t *http2Client) handlePing(f *http2.PingFrame) { 945 if f.IsAck() { 946 // Maybe it's a BDP ping. 947 if t.bdpEst != nil { 948 t.bdpEst.calculate(f.Data) 949 } 950 return 951 } 952 pingAck := &ping{ack: true} 953 copy(pingAck.data[:], f.Data[:]) 954 t.controlBuf.put(pingAck) 955} 956 957func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { 958 t.mu.Lock() 959 if t.state == closing { 960 t.mu.Unlock() 961 return 962 } 963 if f.ErrCode == http2.ErrCodeEnhanceYourCalm { 964 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") 965 } 966 id := f.LastStreamID 967 if id > 0 && id%2 != 1 { 968 t.mu.Unlock() 969 t.Close() 970 return 971 } 972 // A client can receive multiple GoAways from the server (see 973 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first 974 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be 975 // sent after an RTT delay with the ID of the last stream the server will 976 // process. 977 // 978 // Therefore, when we get the first GoAway we don't necessarily close any 979 // streams. While in case of second GoAway we close all streams created after 980 // the GoAwayId. This way streams that were in-flight while the GoAway from 981 // server was being sent don't get killed. 982 select { 983 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). 984 // If there are multiple GoAways the first one should always have an ID greater than the following ones. 985 if id > t.prevGoAwayID { 986 t.mu.Unlock() 987 t.Close() 988 return 989 } 990 default: 991 t.setGoAwayReason(f) 992 close(t.goAway) 993 t.state = draining 994 t.controlBuf.put(&incomingGoAway{}) 995 } 996 // All streams with IDs greater than the GoAwayId 997 // and smaller than the previous GoAway ID should be killed. 998 upperLimit := t.prevGoAwayID 999 if upperLimit == 0 { // This is the first GoAway Frame. 1000 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. 1001 } 1002 for streamID, stream := range t.activeStreams { 1003 if streamID > id && streamID <= upperLimit { 1004 // The stream was unprocessed by the server. 1005 atomic.StoreUint32(&stream.unprocessed, 1) 1006 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) 1007 } 1008 } 1009 t.prevGoAwayID = id 1010 active := len(t.activeStreams) 1011 t.mu.Unlock() 1012 if active == 0 { 1013 t.Close() 1014 } 1015} 1016 1017// setGoAwayReason sets the value of t.goAwayReason based 1018// on the GoAway frame received. 1019// It expects a lock on transport's mutext to be held by 1020// the caller. 1021func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { 1022 t.goAwayReason = GoAwayNoReason 1023 switch f.ErrCode { 1024 case http2.ErrCodeEnhanceYourCalm: 1025 if string(f.DebugData()) == "too_many_pings" { 1026 t.goAwayReason = GoAwayTooManyPings 1027 } 1028 } 1029} 1030 1031func (t *http2Client) GetGoAwayReason() GoAwayReason { 1032 t.mu.Lock() 1033 defer t.mu.Unlock() 1034 return t.goAwayReason 1035} 1036 1037func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { 1038 t.controlBuf.put(&incomingWindowUpdate{ 1039 streamID: f.Header().StreamID, 1040 increment: f.Increment, 1041 }) 1042} 1043 1044// operateHeaders takes action on the decoded headers. 1045func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { 1046 s, ok := t.getStream(frame) 1047 if !ok { 1048 return 1049 } 1050 atomic.StoreUint32(&s.bytesReceived, 1) 1051 var state decodeState 1052 if err := state.decodeResponseHeader(frame); err != nil { 1053 t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false) 1054 // Something wrong. Stops reading even when there is remaining. 1055 return 1056 } 1057 1058 endStream := frame.StreamEnded() 1059 var isHeader bool 1060 defer func() { 1061 if t.statsHandler != nil { 1062 if isHeader { 1063 inHeader := &stats.InHeader{ 1064 Client: true, 1065 WireLength: int(frame.Header().Length), 1066 } 1067 t.statsHandler.HandleRPC(s.ctx, inHeader) 1068 } else { 1069 inTrailer := &stats.InTrailer{ 1070 Client: true, 1071 WireLength: int(frame.Header().Length), 1072 } 1073 t.statsHandler.HandleRPC(s.ctx, inTrailer) 1074 } 1075 } 1076 }() 1077 // If headers haven't been received yet. 1078 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 1079 if !endStream { 1080 // Headers frame is not actually a trailers-only frame. 1081 isHeader = true 1082 // These values can be set without any synchronization because 1083 // stream goroutine will read it only after seeing a closed 1084 // headerChan which we'll close after setting this. 1085 s.recvCompress = state.encoding 1086 if len(state.mdata) > 0 { 1087 s.header = state.mdata 1088 } 1089 } 1090 close(s.headerChan) 1091 } 1092 if !endStream { 1093 return 1094 } 1095 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true) 1096} 1097 1098// reader runs as a separate goroutine in charge of reading data from network 1099// connection. 1100// 1101// TODO(zhaoq): currently one reader per transport. Investigate whether this is 1102// optimal. 1103// TODO(zhaoq): Check the validity of the incoming frame sequence. 1104func (t *http2Client) reader() { 1105 defer close(t.readerDone) 1106 // Check the validity of server preface. 1107 frame, err := t.framer.fr.ReadFrame() 1108 if err != nil { 1109 t.Close() 1110 return 1111 } 1112 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1113 sf, ok := frame.(*http2.SettingsFrame) 1114 if !ok { 1115 t.Close() 1116 return 1117 } 1118 t.onSuccess() 1119 t.handleSettings(sf, true) 1120 1121 // loop to keep reading incoming messages on this transport. 1122 for { 1123 frame, err := t.framer.fr.ReadFrame() 1124 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1125 if err != nil { 1126 // Abort an active stream if the http2.Framer returns a 1127 // http2.StreamError. This can happen only if the server's response 1128 // is malformed http2. 1129 if se, ok := err.(http2.StreamError); ok { 1130 t.mu.Lock() 1131 s := t.activeStreams[se.StreamID] 1132 t.mu.Unlock() 1133 if s != nil { 1134 // use error detail to provide better err message 1135 t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false) 1136 } 1137 continue 1138 } else { 1139 // Transport error. 1140 t.Close() 1141 return 1142 } 1143 } 1144 switch frame := frame.(type) { 1145 case *http2.MetaHeadersFrame: 1146 t.operateHeaders(frame) 1147 case *http2.DataFrame: 1148 t.handleData(frame) 1149 case *http2.RSTStreamFrame: 1150 t.handleRSTStream(frame) 1151 case *http2.SettingsFrame: 1152 t.handleSettings(frame, false) 1153 case *http2.PingFrame: 1154 t.handlePing(frame) 1155 case *http2.GoAwayFrame: 1156 t.handleGoAway(frame) 1157 case *http2.WindowUpdateFrame: 1158 t.handleWindowUpdate(frame) 1159 default: 1160 errorf("transport: http2Client.reader got unhandled frame type %v.", frame) 1161 } 1162 } 1163} 1164 1165// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. 1166func (t *http2Client) keepalive() { 1167 p := &ping{data: [8]byte{}} 1168 timer := time.NewTimer(t.kp.Time) 1169 for { 1170 select { 1171 case <-timer.C: 1172 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1173 timer.Reset(t.kp.Time) 1174 continue 1175 } 1176 // Check if keepalive should go dormant. 1177 t.mu.Lock() 1178 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { 1179 // Make awakenKeepalive writable. 1180 <-t.awakenKeepalive 1181 t.mu.Unlock() 1182 select { 1183 case <-t.awakenKeepalive: 1184 // If the control gets here a ping has been sent 1185 // need to reset the timer with keepalive.Timeout. 1186 case <-t.ctx.Done(): 1187 return 1188 } 1189 } else { 1190 t.mu.Unlock() 1191 if channelz.IsOn() { 1192 t.czmu.Lock() 1193 t.kpCount++ 1194 t.czmu.Unlock() 1195 } 1196 // Send ping. 1197 t.controlBuf.put(p) 1198 } 1199 1200 // By the time control gets here a ping has been sent one way or the other. 1201 timer.Reset(t.kp.Timeout) 1202 select { 1203 case <-timer.C: 1204 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1205 timer.Reset(t.kp.Time) 1206 continue 1207 } 1208 t.Close() 1209 return 1210 case <-t.ctx.Done(): 1211 if !timer.Stop() { 1212 <-timer.C 1213 } 1214 return 1215 } 1216 case <-t.ctx.Done(): 1217 if !timer.Stop() { 1218 <-timer.C 1219 } 1220 return 1221 } 1222 } 1223} 1224 1225func (t *http2Client) Error() <-chan struct{} { 1226 return t.ctx.Done() 1227} 1228 1229func (t *http2Client) GoAway() <-chan struct{} { 1230 return t.goAway 1231} 1232 1233func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { 1234 t.czmu.RLock() 1235 s := channelz.SocketInternalMetric{ 1236 StreamsStarted: t.streamsStarted, 1237 StreamsSucceeded: t.streamsSucceeded, 1238 StreamsFailed: t.streamsFailed, 1239 MessagesSent: t.msgSent, 1240 MessagesReceived: t.msgRecv, 1241 KeepAlivesSent: t.kpCount, 1242 LastLocalStreamCreatedTimestamp: t.lastStreamCreated, 1243 LastMessageSentTimestamp: t.lastMsgSent, 1244 LastMessageReceivedTimestamp: t.lastMsgRecv, 1245 LocalFlowControlWindow: int64(t.fc.getSize()), 1246 //socket options 1247 LocalAddr: t.localAddr, 1248 RemoteAddr: t.remoteAddr, 1249 // Security 1250 // RemoteName : 1251 } 1252 t.czmu.RUnlock() 1253 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1254 return &s 1255} 1256 1257func (t *http2Client) IncrMsgSent() { 1258 t.czmu.Lock() 1259 t.msgSent++ 1260 t.lastMsgSent = time.Now() 1261 t.czmu.Unlock() 1262} 1263 1264func (t *http2Client) IncrMsgRecv() { 1265 t.czmu.Lock() 1266 t.msgRecv++ 1267 t.lastMsgRecv = time.Now() 1268 t.czmu.Unlock() 1269} 1270 1271func (t *http2Client) getOutFlowWindow() int64 { 1272 resp := make(chan uint32, 1) 1273 timer := time.NewTimer(time.Second) 1274 defer timer.Stop() 1275 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1276 select { 1277 case sz := <-resp: 1278 return int64(sz) 1279 case <-t.ctxDone: 1280 return -1 1281 case <-timer.C: 1282 return -2 1283 } 1284} 1285