1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "context" 23 "fmt" 24 "io" 25 "math" 26 "net" 27 "strconv" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "golang.org/x/net/http2" 34 "golang.org/x/net/http2/hpack" 35 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/internal/syscall" 40 "google.golang.org/grpc/keepalive" 41 "google.golang.org/grpc/metadata" 42 "google.golang.org/grpc/peer" 43 "google.golang.org/grpc/stats" 44 "google.golang.org/grpc/status" 45) 46 47// http2Client implements the ClientTransport interface with HTTP2. 48type http2Client struct { 49 ctx context.Context 50 cancel context.CancelFunc 51 ctxDone <-chan struct{} // Cache the ctx.Done() chan. 52 userAgent string 53 md interface{} 54 conn net.Conn // underlying communication channel 55 loopy *loopyWriter 56 remoteAddr net.Addr 57 localAddr net.Addr 58 authInfo credentials.AuthInfo // auth info about the connection 59 60 readerDone chan struct{} // sync point to enable testing. 61 writerDone chan struct{} // sync point to enable testing. 62 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) 63 // that the server sent GoAway on this transport. 64 goAway chan struct{} 65 // awakenKeepalive is used to wake up keepalive when after it has gone dormant. 66 awakenKeepalive chan struct{} 67 68 framer *framer 69 // controlBuf delivers all the control related tasks (e.g., window 70 // updates, reset streams, and various settings) to the controller. 71 controlBuf *controlBuffer 72 fc *trInFlow 73 // The scheme used: https if TLS is on, http otherwise. 74 scheme string 75 76 isSecure bool 77 78 perRPCCreds []credentials.PerRPCCredentials 79 80 // Boolean to keep track of reading activity on transport. 81 // 1 is true and 0 is false. 82 activity uint32 // Accessed atomically. 83 kp keepalive.ClientParameters 84 keepaliveEnabled bool 85 86 statsHandler stats.Handler 87 88 initialWindowSize int32 89 90 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE 91 maxSendHeaderListSize *uint32 92 93 bdpEst *bdpEstimator 94 // onPrefaceReceipt is a callback that client transport calls upon 95 // receiving server preface to signal that a succefull HTTP2 96 // connection was established. 97 onPrefaceReceipt func() 98 99 maxConcurrentStreams uint32 100 streamQuota int64 101 streamsQuotaAvailable chan struct{} 102 waitingStreams uint32 103 nextID uint32 104 105 mu sync.Mutex // guard the following variables 106 state transportState 107 activeStreams map[uint32]*Stream 108 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. 109 prevGoAwayID uint32 110 // goAwayReason records the http2.ErrCode and debug data received with the 111 // GoAway frame. 112 goAwayReason GoAwayReason 113 114 // Fields below are for channelz metric collection. 115 channelzID int64 // channelz unique identification number 116 czData *channelzData 117 118 onGoAway func(GoAwayReason) 119 onClose func() 120 121 bufferPool *bufferPool 122} 123 124func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { 125 if fn != nil { 126 return fn(ctx, addr) 127 } 128 return (&net.Dialer{}).DialContext(ctx, "tcp", addr) 129} 130 131func isTemporary(err error) bool { 132 switch err := err.(type) { 133 case interface { 134 Temporary() bool 135 }: 136 return err.Temporary() 137 case interface { 138 Timeout() bool 139 }: 140 // Timeouts may be resolved upon retry, and are thus treated as 141 // temporary. 142 return err.Timeout() 143 } 144 return true 145} 146 147// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 148// and starts to receive messages on it. Non-nil error returns if construction 149// fails. 150func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { 151 scheme := "http" 152 ctx, cancel := context.WithCancel(ctx) 153 defer func() { 154 if err != nil { 155 cancel() 156 } 157 }() 158 159 conn, err := dial(connectCtx, opts.Dialer, addr.Addr) 160 if err != nil { 161 if opts.FailOnNonTempDialError { 162 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) 163 } 164 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) 165 } 166 // Any further errors will close the underlying connection 167 defer func(conn net.Conn) { 168 if err != nil { 169 conn.Close() 170 } 171 }(conn) 172 kp := opts.KeepaliveParams 173 // Validate keepalive parameters. 174 if kp.Time == 0 { 175 kp.Time = defaultClientKeepaliveTime 176 } 177 if kp.Timeout == 0 { 178 kp.Timeout = defaultClientKeepaliveTimeout 179 } 180 keepaliveEnabled := false 181 if kp.Time != infinity { 182 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { 183 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err) 184 } 185 keepaliveEnabled = true 186 } 187 var ( 188 isSecure bool 189 authInfo credentials.AuthInfo 190 ) 191 transportCreds := opts.TransportCredentials 192 perRPCCreds := opts.PerRPCCredentials 193 194 if b := opts.CredsBundle; b != nil { 195 if t := b.TransportCredentials(); t != nil { 196 transportCreds = t 197 } 198 if t := b.PerRPCCredentials(); t != nil { 199 perRPCCreds = append(perRPCCreds, t) 200 } 201 } 202 if transportCreds != nil { 203 scheme = "https" 204 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn) 205 if err != nil { 206 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) 207 } 208 isSecure = true 209 } 210 dynamicWindow := true 211 icwz := int32(initialWindowSize) 212 if opts.InitialConnWindowSize >= defaultWindowSize { 213 icwz = opts.InitialConnWindowSize 214 dynamicWindow = false 215 } 216 writeBufSize := opts.WriteBufferSize 217 readBufSize := opts.ReadBufferSize 218 maxHeaderListSize := defaultClientMaxHeaderListSize 219 if opts.MaxHeaderListSize != nil { 220 maxHeaderListSize = *opts.MaxHeaderListSize 221 } 222 t := &http2Client{ 223 ctx: ctx, 224 ctxDone: ctx.Done(), // Cache Done chan. 225 cancel: cancel, 226 userAgent: opts.UserAgent, 227 md: addr.Metadata, 228 conn: conn, 229 remoteAddr: conn.RemoteAddr(), 230 localAddr: conn.LocalAddr(), 231 authInfo: authInfo, 232 readerDone: make(chan struct{}), 233 writerDone: make(chan struct{}), 234 goAway: make(chan struct{}), 235 awakenKeepalive: make(chan struct{}, 1), 236 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize), 237 fc: &trInFlow{limit: uint32(icwz)}, 238 scheme: scheme, 239 activeStreams: make(map[uint32]*Stream), 240 isSecure: isSecure, 241 perRPCCreds: perRPCCreds, 242 kp: kp, 243 statsHandler: opts.StatsHandler, 244 initialWindowSize: initialWindowSize, 245 onPrefaceReceipt: onPrefaceReceipt, 246 nextID: 1, 247 maxConcurrentStreams: defaultMaxStreamsClient, 248 streamQuota: defaultMaxStreamsClient, 249 streamsQuotaAvailable: make(chan struct{}, 1), 250 czData: new(channelzData), 251 onGoAway: onGoAway, 252 onClose: onClose, 253 keepaliveEnabled: keepaliveEnabled, 254 bufferPool: newBufferPool(), 255 } 256 t.controlBuf = newControlBuffer(t.ctxDone) 257 if opts.InitialWindowSize >= defaultWindowSize { 258 t.initialWindowSize = opts.InitialWindowSize 259 dynamicWindow = false 260 } 261 if dynamicWindow { 262 t.bdpEst = &bdpEstimator{ 263 bdp: initialWindowSize, 264 updateFlowControl: t.updateFlowControl, 265 } 266 } 267 // Make sure awakenKeepalive can't be written upon. 268 // keepalive routine will make it writable, if need be. 269 t.awakenKeepalive <- struct{}{} 270 if t.statsHandler != nil { 271 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ 272 RemoteAddr: t.remoteAddr, 273 LocalAddr: t.localAddr, 274 }) 275 connBegin := &stats.ConnBegin{ 276 Client: true, 277 } 278 t.statsHandler.HandleConn(t.ctx, connBegin) 279 } 280 if channelz.IsOn() { 281 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) 282 } 283 if t.keepaliveEnabled { 284 go t.keepalive() 285 } 286 // Start the reader goroutine for incoming message. Each transport has 287 // a dedicated goroutine which reads HTTP2 frame from network. Then it 288 // dispatches the frame to the corresponding stream entity. 289 go t.reader() 290 291 // Send connection preface to server. 292 n, err := t.conn.Write(clientPreface) 293 if err != nil { 294 t.Close() 295 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) 296 } 297 if n != len(clientPreface) { 298 t.Close() 299 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) 300 } 301 var ss []http2.Setting 302 303 if t.initialWindowSize != defaultWindowSize { 304 ss = append(ss, http2.Setting{ 305 ID: http2.SettingInitialWindowSize, 306 Val: uint32(t.initialWindowSize), 307 }) 308 } 309 if opts.MaxHeaderListSize != nil { 310 ss = append(ss, http2.Setting{ 311 ID: http2.SettingMaxHeaderListSize, 312 Val: *opts.MaxHeaderListSize, 313 }) 314 } 315 err = t.framer.fr.WriteSettings(ss...) 316 if err != nil { 317 t.Close() 318 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) 319 } 320 // Adjust the connection flow control window if needed. 321 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 322 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { 323 t.Close() 324 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) 325 } 326 } 327 328 if err := t.framer.writer.Flush(); err != nil { 329 return nil, err 330 } 331 go func() { 332 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) 333 err := t.loopy.run() 334 if err != nil { 335 errorf("transport: loopyWriter.run returning. Err: %v", err) 336 } 337 // If it's a connection error, let reader goroutine handle it 338 // since there might be data in the buffers. 339 if _, ok := err.(net.Error); !ok { 340 t.conn.Close() 341 } 342 close(t.writerDone) 343 }() 344 return t, nil 345} 346 347func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { 348 // TODO(zhaoq): Handle uint32 overflow of Stream.id. 349 s := &Stream{ 350 done: make(chan struct{}), 351 method: callHdr.Method, 352 sendCompress: callHdr.SendCompress, 353 buf: newRecvBuffer(), 354 headerChan: make(chan struct{}), 355 contentSubtype: callHdr.ContentSubtype, 356 } 357 s.wq = newWriteQuota(defaultWriteQuota, s.done) 358 s.requestRead = func(n int) { 359 t.adjustWindow(s, uint32(n)) 360 } 361 // The client side stream context should have exactly the same life cycle with the user provided context. 362 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. 363 // So we use the original context here instead of creating a copy. 364 s.ctx = ctx 365 s.trReader = &transportReader{ 366 reader: &recvBufferReader{ 367 ctx: s.ctx, 368 ctxDone: s.ctx.Done(), 369 recv: s.buf, 370 closeStream: func(err error) { 371 t.CloseStream(s, err) 372 }, 373 freeBuffer: t.bufferPool.put, 374 }, 375 windowHandler: func(n int) { 376 t.updateWindow(s, uint32(n)) 377 }, 378 } 379 return s 380} 381 382func (t *http2Client) getPeer() *peer.Peer { 383 pr := &peer.Peer{ 384 Addr: t.remoteAddr, 385 } 386 // Attach Auth info if there is any. 387 if t.authInfo != nil { 388 pr.AuthInfo = t.authInfo 389 } 390 return pr 391} 392 393func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { 394 aud := t.createAudience(callHdr) 395 authData, err := t.getTrAuthData(ctx, aud) 396 if err != nil { 397 return nil, err 398 } 399 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr) 400 if err != nil { 401 return nil, err 402 } 403 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 404 // first and create a slice of that exact size. 405 // Make the slice of certain predictable size to reduce allocations made by append. 406 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te 407 hfLen += len(authData) + len(callAuthData) 408 headerFields := make([]hpack.HeaderField, 0, hfLen) 409 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) 410 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) 411 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) 412 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) 413 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) 414 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) 415 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) 416 if callHdr.PreviousAttempts > 0 { 417 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)}) 418 } 419 420 if callHdr.SendCompress != "" { 421 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) 422 } 423 if dl, ok := ctx.Deadline(); ok { 424 // Send out timeout regardless its value. The server can detect timeout context by itself. 425 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. 426 timeout := time.Until(dl) 427 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) 428 } 429 for k, v := range authData { 430 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 431 } 432 for k, v := range callAuthData { 433 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 434 } 435 if b := stats.OutgoingTags(ctx); b != nil { 436 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) 437 } 438 if b := stats.OutgoingTrace(ctx); b != nil { 439 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) 440 } 441 442 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { 443 var k string 444 for k, vv := range md { 445 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 446 if isReservedHeader(k) { 447 continue 448 } 449 for _, v := range vv { 450 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 451 } 452 } 453 for _, vv := range added { 454 for i, v := range vv { 455 if i%2 == 0 { 456 k = v 457 continue 458 } 459 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 460 if isReservedHeader(k) { 461 continue 462 } 463 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) 464 } 465 } 466 } 467 if md, ok := t.md.(*metadata.MD); ok { 468 for k, vv := range *md { 469 if isReservedHeader(k) { 470 continue 471 } 472 for _, v := range vv { 473 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 474 } 475 } 476 } 477 return headerFields, nil 478} 479 480func (t *http2Client) createAudience(callHdr *CallHdr) string { 481 // Create an audience string only if needed. 482 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil { 483 return "" 484 } 485 // Construct URI required to get auth request metadata. 486 // Omit port if it is the default one. 487 host := strings.TrimSuffix(callHdr.Host, ":443") 488 pos := strings.LastIndex(callHdr.Method, "/") 489 if pos == -1 { 490 pos = len(callHdr.Method) 491 } 492 return "https://" + host + callHdr.Method[:pos] 493} 494 495func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { 496 if len(t.perRPCCreds) == 0 { 497 return nil, nil 498 } 499 authData := map[string]string{} 500 for _, c := range t.perRPCCreds { 501 data, err := c.GetRequestMetadata(ctx, audience) 502 if err != nil { 503 if _, ok := status.FromError(err); ok { 504 return nil, err 505 } 506 507 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err) 508 } 509 for k, v := range data { 510 // Capital header names are illegal in HTTP/2. 511 k = strings.ToLower(k) 512 authData[k] = v 513 } 514 } 515 return authData, nil 516} 517 518func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { 519 var callAuthData map[string]string 520 // Check if credentials.PerRPCCredentials were provided via call options. 521 // Note: if these credentials are provided both via dial options and call 522 // options, then both sets of credentials will be applied. 523 if callCreds := callHdr.Creds; callCreds != nil { 524 if !t.isSecure && callCreds.RequireTransportSecurity() { 525 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") 526 } 527 data, err := callCreds.GetRequestMetadata(ctx, audience) 528 if err != nil { 529 return nil, status.Errorf(codes.Internal, "transport: %v", err) 530 } 531 callAuthData = make(map[string]string, len(data)) 532 for k, v := range data { 533 // Capital header names are illegal in HTTP/2 534 k = strings.ToLower(k) 535 callAuthData[k] = v 536 } 537 } 538 return callAuthData, nil 539} 540 541// NewStream creates a stream and registers it into the transport as "active" 542// streams. 543func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { 544 ctx = peer.NewContext(ctx, t.getPeer()) 545 headerFields, err := t.createHeaderFields(ctx, callHdr) 546 if err != nil { 547 return nil, err 548 } 549 s := t.newStream(ctx, callHdr) 550 cleanup := func(err error) { 551 if s.swapState(streamDone) == streamDone { 552 // If it was already done, return. 553 return 554 } 555 // The stream was unprocessed by the server. 556 atomic.StoreUint32(&s.unprocessed, 1) 557 s.write(recvMsg{err: err}) 558 close(s.done) 559 // If headerChan isn't closed, then close it. 560 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 561 close(s.headerChan) 562 } 563 } 564 hdr := &headerFrame{ 565 hf: headerFields, 566 endStream: false, 567 initStream: func(id uint32) (bool, error) { 568 t.mu.Lock() 569 if state := t.state; state != reachable { 570 t.mu.Unlock() 571 // Do a quick cleanup. 572 err := error(errStreamDrain) 573 if state == closing { 574 err = ErrConnClosing 575 } 576 cleanup(err) 577 return false, err 578 } 579 t.activeStreams[id] = s 580 if channelz.IsOn() { 581 atomic.AddInt64(&t.czData.streamsStarted, 1) 582 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) 583 } 584 var sendPing bool 585 // If the number of active streams change from 0 to 1, then check if keepalive 586 // has gone dormant. If so, wake it up. 587 if len(t.activeStreams) == 1 && t.keepaliveEnabled { 588 select { 589 case t.awakenKeepalive <- struct{}{}: 590 sendPing = true 591 // Fill the awakenKeepalive channel again as this channel must be 592 // kept non-writable except at the point that the keepalive() 593 // goroutine is waiting either to be awaken or shutdown. 594 t.awakenKeepalive <- struct{}{} 595 default: 596 } 597 } 598 t.mu.Unlock() 599 return sendPing, nil 600 }, 601 onOrphaned: cleanup, 602 wq: s.wq, 603 } 604 firstTry := true 605 var ch chan struct{} 606 checkForStreamQuota := func(it interface{}) bool { 607 if t.streamQuota <= 0 { // Can go negative if server decreases it. 608 if firstTry { 609 t.waitingStreams++ 610 } 611 ch = t.streamsQuotaAvailable 612 return false 613 } 614 if !firstTry { 615 t.waitingStreams-- 616 } 617 t.streamQuota-- 618 h := it.(*headerFrame) 619 h.streamID = t.nextID 620 t.nextID += 2 621 s.id = h.streamID 622 s.fc = &inFlow{limit: uint32(t.initialWindowSize)} 623 if t.streamQuota > 0 && t.waitingStreams > 0 { 624 select { 625 case t.streamsQuotaAvailable <- struct{}{}: 626 default: 627 } 628 } 629 return true 630 } 631 var hdrListSizeErr error 632 checkForHeaderListSize := func(it interface{}) bool { 633 if t.maxSendHeaderListSize == nil { 634 return true 635 } 636 hdrFrame := it.(*headerFrame) 637 var sz int64 638 for _, f := range hdrFrame.hf { 639 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { 640 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) 641 return false 642 } 643 } 644 return true 645 } 646 for { 647 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { 648 if !checkForStreamQuota(it) { 649 return false 650 } 651 if !checkForHeaderListSize(it) { 652 return false 653 } 654 return true 655 }, hdr) 656 if err != nil { 657 return nil, err 658 } 659 if success { 660 break 661 } 662 if hdrListSizeErr != nil { 663 return nil, hdrListSizeErr 664 } 665 firstTry = false 666 select { 667 case <-ch: 668 case <-s.ctx.Done(): 669 return nil, ContextErr(s.ctx.Err()) 670 case <-t.goAway: 671 return nil, errStreamDrain 672 case <-t.ctx.Done(): 673 return nil, ErrConnClosing 674 } 675 } 676 if t.statsHandler != nil { 677 outHeader := &stats.OutHeader{ 678 Client: true, 679 FullMethod: callHdr.Method, 680 RemoteAddr: t.remoteAddr, 681 LocalAddr: t.localAddr, 682 Compression: callHdr.SendCompress, 683 } 684 t.statsHandler.HandleRPC(s.ctx, outHeader) 685 } 686 return s, nil 687} 688 689// CloseStream clears the footprint of a stream when the stream is not needed any more. 690// This must not be executed in reader's goroutine. 691func (t *http2Client) CloseStream(s *Stream, err error) { 692 var ( 693 rst bool 694 rstCode http2.ErrCode 695 ) 696 if err != nil { 697 rst = true 698 rstCode = http2.ErrCodeCancel 699 } 700 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false) 701} 702 703func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { 704 // Set stream status to done. 705 if s.swapState(streamDone) == streamDone { 706 // If it was already done, return. If multiple closeStream calls 707 // happen simultaneously, wait for the first to finish. 708 <-s.done 709 return 710 } 711 // status and trailers can be updated here without any synchronization because the stream goroutine will 712 // only read it after it sees an io.EOF error from read or write and we'll write those errors 713 // only after updating this. 714 s.status = st 715 if len(mdata) > 0 { 716 s.trailer = mdata 717 } 718 if err != nil { 719 // This will unblock reads eventually. 720 s.write(recvMsg{err: err}) 721 } 722 // If headerChan isn't closed, then close it. 723 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 724 s.noHeaders = true 725 close(s.headerChan) 726 } 727 cleanup := &cleanupStream{ 728 streamID: s.id, 729 onWrite: func() { 730 t.mu.Lock() 731 if t.activeStreams != nil { 732 delete(t.activeStreams, s.id) 733 } 734 t.mu.Unlock() 735 if channelz.IsOn() { 736 if eosReceived { 737 atomic.AddInt64(&t.czData.streamsSucceeded, 1) 738 } else { 739 atomic.AddInt64(&t.czData.streamsFailed, 1) 740 } 741 } 742 }, 743 rst: rst, 744 rstCode: rstCode, 745 } 746 addBackStreamQuota := func(interface{}) bool { 747 t.streamQuota++ 748 if t.streamQuota > 0 && t.waitingStreams > 0 { 749 select { 750 case t.streamsQuotaAvailable <- struct{}{}: 751 default: 752 } 753 } 754 return true 755 } 756 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) 757 // This will unblock write. 758 close(s.done) 759} 760 761// Close kicks off the shutdown process of the transport. This should be called 762// only once on a transport. Once it is called, the transport should not be 763// accessed any more. 764// 765// This method blocks until the addrConn that initiated this transport is 766// re-connected. This happens because t.onClose() begins reconnect logic at the 767// addrConn level and blocks until the addrConn is successfully connected. 768func (t *http2Client) Close() error { 769 t.mu.Lock() 770 // Make sure we only Close once. 771 if t.state == closing { 772 t.mu.Unlock() 773 return nil 774 } 775 // Call t.onClose before setting the state to closing to prevent the client 776 // from attempting to create new streams ASAP. 777 t.onClose() 778 t.state = closing 779 streams := t.activeStreams 780 t.activeStreams = nil 781 t.mu.Unlock() 782 t.controlBuf.finish() 783 t.cancel() 784 err := t.conn.Close() 785 if channelz.IsOn() { 786 channelz.RemoveEntry(t.channelzID) 787 } 788 // Notify all active streams. 789 for _, s := range streams { 790 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) 791 } 792 if t.statsHandler != nil { 793 connEnd := &stats.ConnEnd{ 794 Client: true, 795 } 796 t.statsHandler.HandleConn(t.ctx, connEnd) 797 } 798 return err 799} 800 801// GracefulClose sets the state to draining, which prevents new streams from 802// being created and causes the transport to be closed when the last active 803// stream is closed. If there are no active streams, the transport is closed 804// immediately. This does nothing if the transport is already draining or 805// closing. 806func (t *http2Client) GracefulClose() { 807 t.mu.Lock() 808 // Make sure we move to draining only from active. 809 if t.state == draining || t.state == closing { 810 t.mu.Unlock() 811 return 812 } 813 t.state = draining 814 active := len(t.activeStreams) 815 t.mu.Unlock() 816 if active == 0 { 817 t.Close() 818 return 819 } 820 t.controlBuf.put(&incomingGoAway{}) 821} 822 823// Write formats the data into HTTP2 data frame(s) and sends it out. The caller 824// should proceed only if Write returns nil. 825func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 826 if opts.Last { 827 // If it's the last message, update stream state. 828 if !s.compareAndSwapState(streamActive, streamWriteDone) { 829 return errStreamDone 830 } 831 } else if s.getState() != streamActive { 832 return errStreamDone 833 } 834 df := &dataFrame{ 835 streamID: s.id, 836 endStream: opts.Last, 837 } 838 if hdr != nil || data != nil { // If it's not an empty data frame. 839 // Add some data to grpc message header so that we can equally 840 // distribute bytes across frames. 841 emptyLen := http2MaxFrameLen - len(hdr) 842 if emptyLen > len(data) { 843 emptyLen = len(data) 844 } 845 hdr = append(hdr, data[:emptyLen]...) 846 data = data[emptyLen:] 847 df.h, df.d = hdr, data 848 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. 849 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 850 return err 851 } 852 } 853 return t.controlBuf.put(df) 854} 855 856func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { 857 t.mu.Lock() 858 defer t.mu.Unlock() 859 s, ok := t.activeStreams[f.Header().StreamID] 860 return s, ok 861} 862 863// adjustWindow sends out extra window update over the initial window size 864// of stream if the application is requesting data larger in size than 865// the window. 866func (t *http2Client) adjustWindow(s *Stream, n uint32) { 867 if w := s.fc.maybeAdjust(n); w > 0 { 868 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 869 } 870} 871 872// updateWindow adjusts the inbound quota for the stream. 873// Window updates will be sent out when the cumulative quota 874// exceeds the corresponding threshold. 875func (t *http2Client) updateWindow(s *Stream, n uint32) { 876 if w := s.fc.onRead(n); w > 0 { 877 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 878 } 879} 880 881// updateFlowControl updates the incoming flow control windows 882// for the transport and the stream based on the current bdp 883// estimation. 884func (t *http2Client) updateFlowControl(n uint32) { 885 t.mu.Lock() 886 for _, s := range t.activeStreams { 887 s.fc.newLimit(n) 888 } 889 t.mu.Unlock() 890 updateIWS := func(interface{}) bool { 891 t.initialWindowSize = int32(n) 892 return true 893 } 894 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) 895 t.controlBuf.put(&outgoingSettings{ 896 ss: []http2.Setting{ 897 { 898 ID: http2.SettingInitialWindowSize, 899 Val: n, 900 }, 901 }, 902 }) 903} 904 905func (t *http2Client) handleData(f *http2.DataFrame) { 906 size := f.Header().Length 907 var sendBDPPing bool 908 if t.bdpEst != nil { 909 sendBDPPing = t.bdpEst.add(size) 910 } 911 // Decouple connection's flow control from application's read. 912 // An update on connection's flow control should not depend on 913 // whether user application has read the data or not. Such a 914 // restriction is already imposed on the stream's flow control, 915 // and therefore the sender will be blocked anyways. 916 // Decoupling the connection flow control will prevent other 917 // active(fast) streams from starving in presence of slow or 918 // inactive streams. 919 // 920 if w := t.fc.onData(size); w > 0 { 921 t.controlBuf.put(&outgoingWindowUpdate{ 922 streamID: 0, 923 increment: w, 924 }) 925 } 926 if sendBDPPing { 927 // Avoid excessive ping detection (e.g. in an L7 proxy) 928 // by sending a window update prior to the BDP ping. 929 930 if w := t.fc.reset(); w > 0 { 931 t.controlBuf.put(&outgoingWindowUpdate{ 932 streamID: 0, 933 increment: w, 934 }) 935 } 936 937 t.controlBuf.put(bdpPing) 938 } 939 // Select the right stream to dispatch. 940 s, ok := t.getStream(f) 941 if !ok { 942 return 943 } 944 if size > 0 { 945 if err := s.fc.onData(size); err != nil { 946 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) 947 return 948 } 949 if f.Header().Flags.Has(http2.FlagDataPadded) { 950 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 951 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 952 } 953 } 954 // TODO(bradfitz, zhaoq): A copy is required here because there is no 955 // guarantee f.Data() is consumed before the arrival of next frame. 956 // Can this copy be eliminated? 957 if len(f.Data()) > 0 { 958 buffer := t.bufferPool.get() 959 buffer.Reset() 960 buffer.Write(f.Data()) 961 s.write(recvMsg{buffer: buffer}) 962 } 963 } 964 // The server has closed the stream without sending trailers. Record that 965 // the read direction is closed, and set the status appropriately. 966 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { 967 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) 968 } 969} 970 971func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { 972 s, ok := t.getStream(f) 973 if !ok { 974 return 975 } 976 if f.ErrCode == http2.ErrCodeRefusedStream { 977 // The stream was unprocessed by the server. 978 atomic.StoreUint32(&s.unprocessed, 1) 979 } 980 statusCode, ok := http2ErrConvTab[f.ErrCode] 981 if !ok { 982 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) 983 statusCode = codes.Unknown 984 } 985 if statusCode == codes.Canceled { 986 if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) { 987 // Our deadline was already exceeded, and that was likely the cause 988 // of this cancelation. Alter the status code accordingly. 989 statusCode = codes.DeadlineExceeded 990 } 991 } 992 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) 993} 994 995func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { 996 if f.IsAck() { 997 return 998 } 999 var maxStreams *uint32 1000 var ss []http2.Setting 1001 var updateFuncs []func() 1002 f.ForeachSetting(func(s http2.Setting) error { 1003 switch s.ID { 1004 case http2.SettingMaxConcurrentStreams: 1005 maxStreams = new(uint32) 1006 *maxStreams = s.Val 1007 case http2.SettingMaxHeaderListSize: 1008 updateFuncs = append(updateFuncs, func() { 1009 t.maxSendHeaderListSize = new(uint32) 1010 *t.maxSendHeaderListSize = s.Val 1011 }) 1012 default: 1013 ss = append(ss, s) 1014 } 1015 return nil 1016 }) 1017 if isFirst && maxStreams == nil { 1018 maxStreams = new(uint32) 1019 *maxStreams = math.MaxUint32 1020 } 1021 sf := &incomingSettings{ 1022 ss: ss, 1023 } 1024 if maxStreams != nil { 1025 updateStreamQuota := func() { 1026 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams) 1027 t.maxConcurrentStreams = *maxStreams 1028 t.streamQuota += delta 1029 if delta > 0 && t.waitingStreams > 0 { 1030 close(t.streamsQuotaAvailable) // wake all of them up. 1031 t.streamsQuotaAvailable = make(chan struct{}, 1) 1032 } 1033 } 1034 updateFuncs = append(updateFuncs, updateStreamQuota) 1035 } 1036 t.controlBuf.executeAndPut(func(interface{}) bool { 1037 for _, f := range updateFuncs { 1038 f() 1039 } 1040 return true 1041 }, sf) 1042} 1043 1044func (t *http2Client) handlePing(f *http2.PingFrame) { 1045 if f.IsAck() { 1046 // Maybe it's a BDP ping. 1047 if t.bdpEst != nil { 1048 t.bdpEst.calculate(f.Data) 1049 } 1050 return 1051 } 1052 pingAck := &ping{ack: true} 1053 copy(pingAck.data[:], f.Data[:]) 1054 t.controlBuf.put(pingAck) 1055} 1056 1057func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { 1058 t.mu.Lock() 1059 if t.state == closing { 1060 t.mu.Unlock() 1061 return 1062 } 1063 if f.ErrCode == http2.ErrCodeEnhanceYourCalm { 1064 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") 1065 } 1066 id := f.LastStreamID 1067 if id > 0 && id%2 != 1 { 1068 t.mu.Unlock() 1069 t.Close() 1070 return 1071 } 1072 // A client can receive multiple GoAways from the server (see 1073 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first 1074 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be 1075 // sent after an RTT delay with the ID of the last stream the server will 1076 // process. 1077 // 1078 // Therefore, when we get the first GoAway we don't necessarily close any 1079 // streams. While in case of second GoAway we close all streams created after 1080 // the GoAwayId. This way streams that were in-flight while the GoAway from 1081 // server was being sent don't get killed. 1082 select { 1083 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). 1084 // If there are multiple GoAways the first one should always have an ID greater than the following ones. 1085 if id > t.prevGoAwayID { 1086 t.mu.Unlock() 1087 t.Close() 1088 return 1089 } 1090 default: 1091 t.setGoAwayReason(f) 1092 close(t.goAway) 1093 t.controlBuf.put(&incomingGoAway{}) 1094 // Notify the clientconn about the GOAWAY before we set the state to 1095 // draining, to allow the client to stop attempting to create streams 1096 // before disallowing new streams on this connection. 1097 t.onGoAway(t.goAwayReason) 1098 t.state = draining 1099 } 1100 // All streams with IDs greater than the GoAwayId 1101 // and smaller than the previous GoAway ID should be killed. 1102 upperLimit := t.prevGoAwayID 1103 if upperLimit == 0 { // This is the first GoAway Frame. 1104 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. 1105 } 1106 for streamID, stream := range t.activeStreams { 1107 if streamID > id && streamID <= upperLimit { 1108 // The stream was unprocessed by the server. 1109 atomic.StoreUint32(&stream.unprocessed, 1) 1110 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) 1111 } 1112 } 1113 t.prevGoAwayID = id 1114 active := len(t.activeStreams) 1115 t.mu.Unlock() 1116 if active == 0 { 1117 t.Close() 1118 } 1119} 1120 1121// setGoAwayReason sets the value of t.goAwayReason based 1122// on the GoAway frame received. 1123// It expects a lock on transport's mutext to be held by 1124// the caller. 1125func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { 1126 t.goAwayReason = GoAwayNoReason 1127 switch f.ErrCode { 1128 case http2.ErrCodeEnhanceYourCalm: 1129 if string(f.DebugData()) == "too_many_pings" { 1130 t.goAwayReason = GoAwayTooManyPings 1131 } 1132 } 1133} 1134 1135func (t *http2Client) GetGoAwayReason() GoAwayReason { 1136 t.mu.Lock() 1137 defer t.mu.Unlock() 1138 return t.goAwayReason 1139} 1140 1141func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { 1142 t.controlBuf.put(&incomingWindowUpdate{ 1143 streamID: f.Header().StreamID, 1144 increment: f.Increment, 1145 }) 1146} 1147 1148// operateHeaders takes action on the decoded headers. 1149func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { 1150 s, ok := t.getStream(frame) 1151 if !ok { 1152 return 1153 } 1154 endStream := frame.StreamEnded() 1155 atomic.StoreUint32(&s.bytesReceived, 1) 1156 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0 1157 1158 if !initialHeader && !endStream { 1159 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set. 1160 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream") 1161 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false) 1162 return 1163 } 1164 1165 state := &decodeState{} 1166 // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. 1167 state.data.isGRPC = !initialHeader 1168 if err := state.decodeHeader(frame); err != nil { 1169 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) 1170 return 1171 } 1172 1173 isHeader := false 1174 defer func() { 1175 if t.statsHandler != nil { 1176 if isHeader { 1177 inHeader := &stats.InHeader{ 1178 Client: true, 1179 WireLength: int(frame.Header().Length), 1180 } 1181 t.statsHandler.HandleRPC(s.ctx, inHeader) 1182 } else { 1183 inTrailer := &stats.InTrailer{ 1184 Client: true, 1185 WireLength: int(frame.Header().Length), 1186 } 1187 t.statsHandler.HandleRPC(s.ctx, inTrailer) 1188 } 1189 } 1190 }() 1191 1192 // If headerChan hasn't been closed yet 1193 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 1194 if !endStream { 1195 // HEADERS frame block carries a Response-Headers. 1196 isHeader = true 1197 // These values can be set without any synchronization because 1198 // stream goroutine will read it only after seeing a closed 1199 // headerChan which we'll close after setting this. 1200 s.recvCompress = state.data.encoding 1201 if len(state.data.mdata) > 0 { 1202 s.header = state.data.mdata 1203 } 1204 } else { 1205 // HEADERS frame block carries a Trailers-Only. 1206 s.noHeaders = true 1207 } 1208 close(s.headerChan) 1209 } 1210 1211 if !endStream { 1212 return 1213 } 1214 1215 // if client received END_STREAM from server while stream was still active, send RST_STREAM 1216 rst := s.getState() == streamActive 1217 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) 1218} 1219 1220// reader runs as a separate goroutine in charge of reading data from network 1221// connection. 1222// 1223// TODO(zhaoq): currently one reader per transport. Investigate whether this is 1224// optimal. 1225// TODO(zhaoq): Check the validity of the incoming frame sequence. 1226func (t *http2Client) reader() { 1227 defer close(t.readerDone) 1228 // Check the validity of server preface. 1229 frame, err := t.framer.fr.ReadFrame() 1230 if err != nil { 1231 t.Close() // this kicks off resetTransport, so must be last before return 1232 return 1233 } 1234 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) 1235 if t.keepaliveEnabled { 1236 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1237 } 1238 sf, ok := frame.(*http2.SettingsFrame) 1239 if !ok { 1240 t.Close() // this kicks off resetTransport, so must be last before return 1241 return 1242 } 1243 t.onPrefaceReceipt() 1244 t.handleSettings(sf, true) 1245 1246 // loop to keep reading incoming messages on this transport. 1247 for { 1248 t.controlBuf.throttle() 1249 frame, err := t.framer.fr.ReadFrame() 1250 if t.keepaliveEnabled { 1251 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1252 } 1253 if err != nil { 1254 // Abort an active stream if the http2.Framer returns a 1255 // http2.StreamError. This can happen only if the server's response 1256 // is malformed http2. 1257 if se, ok := err.(http2.StreamError); ok { 1258 t.mu.Lock() 1259 s := t.activeStreams[se.StreamID] 1260 t.mu.Unlock() 1261 if s != nil { 1262 // use error detail to provide better err message 1263 code := http2ErrConvTab[se.Code] 1264 msg := t.framer.fr.ErrorDetail().Error() 1265 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) 1266 } 1267 continue 1268 } else { 1269 // Transport error. 1270 t.Close() 1271 return 1272 } 1273 } 1274 switch frame := frame.(type) { 1275 case *http2.MetaHeadersFrame: 1276 t.operateHeaders(frame) 1277 case *http2.DataFrame: 1278 t.handleData(frame) 1279 case *http2.RSTStreamFrame: 1280 t.handleRSTStream(frame) 1281 case *http2.SettingsFrame: 1282 t.handleSettings(frame, false) 1283 case *http2.PingFrame: 1284 t.handlePing(frame) 1285 case *http2.GoAwayFrame: 1286 t.handleGoAway(frame) 1287 case *http2.WindowUpdateFrame: 1288 t.handleWindowUpdate(frame) 1289 default: 1290 errorf("transport: http2Client.reader got unhandled frame type %v.", frame) 1291 } 1292 } 1293} 1294 1295// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. 1296func (t *http2Client) keepalive() { 1297 p := &ping{data: [8]byte{}} 1298 timer := time.NewTimer(t.kp.Time) 1299 for { 1300 select { 1301 case <-timer.C: 1302 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1303 timer.Reset(t.kp.Time) 1304 continue 1305 } 1306 // Check if keepalive should go dormant. 1307 t.mu.Lock() 1308 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { 1309 // Make awakenKeepalive writable. 1310 <-t.awakenKeepalive 1311 t.mu.Unlock() 1312 select { 1313 case <-t.awakenKeepalive: 1314 // If the control gets here a ping has been sent 1315 // need to reset the timer with keepalive.Timeout. 1316 case <-t.ctx.Done(): 1317 return 1318 } 1319 } else { 1320 t.mu.Unlock() 1321 if channelz.IsOn() { 1322 atomic.AddInt64(&t.czData.kpCount, 1) 1323 } 1324 // Send ping. 1325 t.controlBuf.put(p) 1326 } 1327 1328 // By the time control gets here a ping has been sent one way or the other. 1329 timer.Reset(t.kp.Timeout) 1330 select { 1331 case <-timer.C: 1332 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1333 timer.Reset(t.kp.Time) 1334 continue 1335 } 1336 infof("transport: closing client transport due to idleness.") 1337 t.Close() 1338 return 1339 case <-t.ctx.Done(): 1340 if !timer.Stop() { 1341 <-timer.C 1342 } 1343 return 1344 } 1345 case <-t.ctx.Done(): 1346 if !timer.Stop() { 1347 <-timer.C 1348 } 1349 return 1350 } 1351 } 1352} 1353 1354func (t *http2Client) Error() <-chan struct{} { 1355 return t.ctx.Done() 1356} 1357 1358func (t *http2Client) GoAway() <-chan struct{} { 1359 return t.goAway 1360} 1361 1362func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { 1363 s := channelz.SocketInternalMetric{ 1364 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), 1365 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), 1366 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), 1367 MessagesSent: atomic.LoadInt64(&t.czData.msgSent), 1368 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), 1369 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), 1370 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), 1371 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), 1372 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), 1373 LocalFlowControlWindow: int64(t.fc.getSize()), 1374 SocketOptions: channelz.GetSocketOption(t.conn), 1375 LocalAddr: t.localAddr, 1376 RemoteAddr: t.remoteAddr, 1377 // RemoteName : 1378 } 1379 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1380 s.Security = au.GetSecurityValue() 1381 } 1382 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1383 return &s 1384} 1385 1386func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr } 1387 1388func (t *http2Client) IncrMsgSent() { 1389 atomic.AddInt64(&t.czData.msgSent, 1) 1390 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) 1391} 1392 1393func (t *http2Client) IncrMsgRecv() { 1394 atomic.AddInt64(&t.czData.msgRecv, 1) 1395 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) 1396} 1397 1398func (t *http2Client) getOutFlowWindow() int64 { 1399 resp := make(chan uint32, 1) 1400 timer := time.NewTimer(time.Second) 1401 defer timer.Stop() 1402 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1403 select { 1404 case sz := <-resp: 1405 return int64(sz) 1406 case <-t.ctxDone: 1407 return -1 1408 case <-timer.C: 1409 return -2 1410 } 1411} 1412