1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "context" 23 "fmt" 24 "io" 25 "math" 26 "net" 27 "strconv" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "golang.org/x/net/http2" 34 "golang.org/x/net/http2/hpack" 35 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/internal/syscall" 40 "google.golang.org/grpc/keepalive" 41 "google.golang.org/grpc/metadata" 42 "google.golang.org/grpc/peer" 43 "google.golang.org/grpc/stats" 44 "google.golang.org/grpc/status" 45) 46 47// http2Client implements the ClientTransport interface with HTTP2. 48type http2Client struct { 49 ctx context.Context 50 cancel context.CancelFunc 51 ctxDone <-chan struct{} // Cache the ctx.Done() chan. 52 userAgent string 53 md interface{} 54 conn net.Conn // underlying communication channel 55 loopy *loopyWriter 56 remoteAddr net.Addr 57 localAddr net.Addr 58 authInfo credentials.AuthInfo // auth info about the connection 59 60 readerDone chan struct{} // sync point to enable testing. 61 writerDone chan struct{} // sync point to enable testing. 62 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) 63 // that the server sent GoAway on this transport. 64 goAway chan struct{} 65 // awakenKeepalive is used to wake up keepalive when after it has gone dormant. 66 awakenKeepalive chan struct{} 67 68 framer *framer 69 // controlBuf delivers all the control related tasks (e.g., window 70 // updates, reset streams, and various settings) to the controller. 71 controlBuf *controlBuffer 72 fc *trInFlow 73 // The scheme used: https if TLS is on, http otherwise. 74 scheme string 75 76 isSecure bool 77 78 perRPCCreds []credentials.PerRPCCredentials 79 80 // Boolean to keep track of reading activity on transport. 81 // 1 is true and 0 is false. 82 activity uint32 // Accessed atomically. 83 kp keepalive.ClientParameters 84 keepaliveEnabled bool 85 86 statsHandler stats.Handler 87 88 initialWindowSize int32 89 90 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE 91 maxSendHeaderListSize *uint32 92 93 bdpEst *bdpEstimator 94 // onPrefaceReceipt is a callback that client transport calls upon 95 // receiving server preface to signal that a succefull HTTP2 96 // connection was established. 97 onPrefaceReceipt func() 98 99 maxConcurrentStreams uint32 100 streamQuota int64 101 streamsQuotaAvailable chan struct{} 102 waitingStreams uint32 103 nextID uint32 104 105 mu sync.Mutex // guard the following variables 106 state transportState 107 activeStreams map[uint32]*Stream 108 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. 109 prevGoAwayID uint32 110 // goAwayReason records the http2.ErrCode and debug data received with the 111 // GoAway frame. 112 goAwayReason GoAwayReason 113 114 // Fields below are for channelz metric collection. 115 channelzID int64 // channelz unique identification number 116 czData *channelzData 117 118 onGoAway func(GoAwayReason) 119 onClose func() 120 121 bufferPool *bufferPool 122} 123 124func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { 125 if fn != nil { 126 return fn(ctx, addr) 127 } 128 return (&net.Dialer{}).DialContext(ctx, "tcp", addr) 129} 130 131func isTemporary(err error) bool { 132 switch err := err.(type) { 133 case interface { 134 Temporary() bool 135 }: 136 return err.Temporary() 137 case interface { 138 Timeout() bool 139 }: 140 // Timeouts may be resolved upon retry, and are thus treated as 141 // temporary. 142 return err.Timeout() 143 } 144 return true 145} 146 147// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 148// and starts to receive messages on it. Non-nil error returns if construction 149// fails. 150func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { 151 scheme := "http" 152 ctx, cancel := context.WithCancel(ctx) 153 defer func() { 154 if err != nil { 155 cancel() 156 } 157 }() 158 159 conn, err := dial(connectCtx, opts.Dialer, addr.Addr) 160 if err != nil { 161 if opts.FailOnNonTempDialError { 162 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) 163 } 164 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) 165 } 166 // Any further errors will close the underlying connection 167 defer func(conn net.Conn) { 168 if err != nil { 169 conn.Close() 170 } 171 }(conn) 172 kp := opts.KeepaliveParams 173 // Validate keepalive parameters. 174 if kp.Time == 0 { 175 kp.Time = defaultClientKeepaliveTime 176 } 177 if kp.Timeout == 0 { 178 kp.Timeout = defaultClientKeepaliveTimeout 179 } 180 keepaliveEnabled := false 181 if kp.Time != infinity { 182 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { 183 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err) 184 } 185 keepaliveEnabled = true 186 } 187 var ( 188 isSecure bool 189 authInfo credentials.AuthInfo 190 ) 191 transportCreds := opts.TransportCredentials 192 perRPCCreds := opts.PerRPCCredentials 193 194 if b := opts.CredsBundle; b != nil { 195 if t := b.TransportCredentials(); t != nil { 196 transportCreds = t 197 } 198 if t := b.PerRPCCredentials(); t != nil { 199 perRPCCreds = append(perRPCCreds, t) 200 } 201 } 202 if transportCreds != nil { 203 scheme = "https" 204 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn) 205 if err != nil { 206 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) 207 } 208 isSecure = true 209 } 210 dynamicWindow := true 211 icwz := int32(initialWindowSize) 212 if opts.InitialConnWindowSize >= defaultWindowSize { 213 icwz = opts.InitialConnWindowSize 214 dynamicWindow = false 215 } 216 writeBufSize := opts.WriteBufferSize 217 readBufSize := opts.ReadBufferSize 218 maxHeaderListSize := defaultClientMaxHeaderListSize 219 if opts.MaxHeaderListSize != nil { 220 maxHeaderListSize = *opts.MaxHeaderListSize 221 } 222 t := &http2Client{ 223 ctx: ctx, 224 ctxDone: ctx.Done(), // Cache Done chan. 225 cancel: cancel, 226 userAgent: opts.UserAgent, 227 md: addr.Metadata, 228 conn: conn, 229 remoteAddr: conn.RemoteAddr(), 230 localAddr: conn.LocalAddr(), 231 authInfo: authInfo, 232 readerDone: make(chan struct{}), 233 writerDone: make(chan struct{}), 234 goAway: make(chan struct{}), 235 awakenKeepalive: make(chan struct{}, 1), 236 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize), 237 fc: &trInFlow{limit: uint32(icwz)}, 238 scheme: scheme, 239 activeStreams: make(map[uint32]*Stream), 240 isSecure: isSecure, 241 perRPCCreds: perRPCCreds, 242 kp: kp, 243 statsHandler: opts.StatsHandler, 244 initialWindowSize: initialWindowSize, 245 onPrefaceReceipt: onPrefaceReceipt, 246 nextID: 1, 247 maxConcurrentStreams: defaultMaxStreamsClient, 248 streamQuota: defaultMaxStreamsClient, 249 streamsQuotaAvailable: make(chan struct{}, 1), 250 czData: new(channelzData), 251 onGoAway: onGoAway, 252 onClose: onClose, 253 keepaliveEnabled: keepaliveEnabled, 254 bufferPool: newBufferPool(), 255 } 256 t.controlBuf = newControlBuffer(t.ctxDone) 257 if opts.InitialWindowSize >= defaultWindowSize { 258 t.initialWindowSize = opts.InitialWindowSize 259 dynamicWindow = false 260 } 261 if dynamicWindow { 262 t.bdpEst = &bdpEstimator{ 263 bdp: initialWindowSize, 264 updateFlowControl: t.updateFlowControl, 265 } 266 } 267 // Make sure awakenKeepalive can't be written upon. 268 // keepalive routine will make it writable, if need be. 269 t.awakenKeepalive <- struct{}{} 270 if t.statsHandler != nil { 271 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ 272 RemoteAddr: t.remoteAddr, 273 LocalAddr: t.localAddr, 274 }) 275 connBegin := &stats.ConnBegin{ 276 Client: true, 277 } 278 t.statsHandler.HandleConn(t.ctx, connBegin) 279 } 280 if channelz.IsOn() { 281 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) 282 } 283 if t.keepaliveEnabled { 284 go t.keepalive() 285 } 286 // Start the reader goroutine for incoming message. Each transport has 287 // a dedicated goroutine which reads HTTP2 frame from network. Then it 288 // dispatches the frame to the corresponding stream entity. 289 go t.reader() 290 291 // Send connection preface to server. 292 n, err := t.conn.Write(clientPreface) 293 if err != nil { 294 t.Close() 295 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) 296 } 297 if n != len(clientPreface) { 298 t.Close() 299 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) 300 } 301 var ss []http2.Setting 302 303 if t.initialWindowSize != defaultWindowSize { 304 ss = append(ss, http2.Setting{ 305 ID: http2.SettingInitialWindowSize, 306 Val: uint32(t.initialWindowSize), 307 }) 308 } 309 if opts.MaxHeaderListSize != nil { 310 ss = append(ss, http2.Setting{ 311 ID: http2.SettingMaxHeaderListSize, 312 Val: *opts.MaxHeaderListSize, 313 }) 314 } 315 err = t.framer.fr.WriteSettings(ss...) 316 if err != nil { 317 t.Close() 318 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) 319 } 320 // Adjust the connection flow control window if needed. 321 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 322 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { 323 t.Close() 324 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) 325 } 326 } 327 328 if err := t.framer.writer.Flush(); err != nil { 329 return nil, err 330 } 331 go func() { 332 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) 333 err := t.loopy.run() 334 if err != nil { 335 errorf("transport: loopyWriter.run returning. Err: %v", err) 336 } 337 // If it's a connection error, let reader goroutine handle it 338 // since there might be data in the buffers. 339 if _, ok := err.(net.Error); !ok { 340 t.conn.Close() 341 } 342 close(t.writerDone) 343 }() 344 return t, nil 345} 346 347func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { 348 // TODO(zhaoq): Handle uint32 overflow of Stream.id. 349 s := &Stream{ 350 done: make(chan struct{}), 351 method: callHdr.Method, 352 sendCompress: callHdr.SendCompress, 353 buf: newRecvBuffer(), 354 headerChan: make(chan struct{}), 355 contentSubtype: callHdr.ContentSubtype, 356 } 357 s.wq = newWriteQuota(defaultWriteQuota, s.done) 358 s.requestRead = func(n int) { 359 t.adjustWindow(s, uint32(n)) 360 } 361 // The client side stream context should have exactly the same life cycle with the user provided context. 362 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. 363 // So we use the original context here instead of creating a copy. 364 s.ctx = ctx 365 s.trReader = &transportReader{ 366 reader: &recvBufferReader{ 367 ctx: s.ctx, 368 ctxDone: s.ctx.Done(), 369 recv: s.buf, 370 closeStream: func(err error) { 371 t.CloseStream(s, err) 372 }, 373 freeBuffer: t.bufferPool.put, 374 }, 375 windowHandler: func(n int) { 376 t.updateWindow(s, uint32(n)) 377 }, 378 } 379 return s 380} 381 382func (t *http2Client) getPeer() *peer.Peer { 383 pr := &peer.Peer{ 384 Addr: t.remoteAddr, 385 } 386 // Attach Auth info if there is any. 387 if t.authInfo != nil { 388 pr.AuthInfo = t.authInfo 389 } 390 return pr 391} 392 393func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { 394 aud := t.createAudience(callHdr) 395 authData, err := t.getTrAuthData(ctx, aud) 396 if err != nil { 397 return nil, err 398 } 399 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr) 400 if err != nil { 401 return nil, err 402 } 403 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 404 // first and create a slice of that exact size. 405 // Make the slice of certain predictable size to reduce allocations made by append. 406 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te 407 hfLen += len(authData) + len(callAuthData) 408 headerFields := make([]hpack.HeaderField, 0, hfLen) 409 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) 410 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) 411 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) 412 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) 413 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) 414 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) 415 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) 416 if callHdr.PreviousAttempts > 0 { 417 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)}) 418 } 419 420 if callHdr.SendCompress != "" { 421 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) 422 } 423 if dl, ok := ctx.Deadline(); ok { 424 // Send out timeout regardless its value. The server can detect timeout context by itself. 425 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. 426 timeout := time.Until(dl) 427 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) 428 } 429 for k, v := range authData { 430 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 431 } 432 for k, v := range callAuthData { 433 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 434 } 435 if b := stats.OutgoingTags(ctx); b != nil { 436 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) 437 } 438 if b := stats.OutgoingTrace(ctx); b != nil { 439 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) 440 } 441 442 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { 443 var k string 444 for k, vv := range md { 445 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 446 if isReservedHeader(k) { 447 continue 448 } 449 for _, v := range vv { 450 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 451 } 452 } 453 for _, vv := range added { 454 for i, v := range vv { 455 if i%2 == 0 { 456 k = v 457 continue 458 } 459 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 460 if isReservedHeader(k) { 461 continue 462 } 463 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) 464 } 465 } 466 } 467 if md, ok := t.md.(*metadata.MD); ok { 468 for k, vv := range *md { 469 if isReservedHeader(k) { 470 continue 471 } 472 for _, v := range vv { 473 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 474 } 475 } 476 } 477 return headerFields, nil 478} 479 480func (t *http2Client) createAudience(callHdr *CallHdr) string { 481 // Create an audience string only if needed. 482 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil { 483 return "" 484 } 485 // Construct URI required to get auth request metadata. 486 // Omit port if it is the default one. 487 host := strings.TrimSuffix(callHdr.Host, ":443") 488 pos := strings.LastIndex(callHdr.Method, "/") 489 if pos == -1 { 490 pos = len(callHdr.Method) 491 } 492 return "https://" + host + callHdr.Method[:pos] 493} 494 495func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { 496 authData := map[string]string{} 497 for _, c := range t.perRPCCreds { 498 data, err := c.GetRequestMetadata(ctx, audience) 499 if err != nil { 500 if _, ok := status.FromError(err); ok { 501 return nil, err 502 } 503 504 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err) 505 } 506 for k, v := range data { 507 // Capital header names are illegal in HTTP/2. 508 k = strings.ToLower(k) 509 authData[k] = v 510 } 511 } 512 return authData, nil 513} 514 515func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { 516 callAuthData := map[string]string{} 517 // Check if credentials.PerRPCCredentials were provided via call options. 518 // Note: if these credentials are provided both via dial options and call 519 // options, then both sets of credentials will be applied. 520 if callCreds := callHdr.Creds; callCreds != nil { 521 if !t.isSecure && callCreds.RequireTransportSecurity() { 522 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") 523 } 524 data, err := callCreds.GetRequestMetadata(ctx, audience) 525 if err != nil { 526 return nil, status.Errorf(codes.Internal, "transport: %v", err) 527 } 528 for k, v := range data { 529 // Capital header names are illegal in HTTP/2 530 k = strings.ToLower(k) 531 callAuthData[k] = v 532 } 533 } 534 return callAuthData, nil 535} 536 537// NewStream creates a stream and registers it into the transport as "active" 538// streams. 539func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { 540 ctx = peer.NewContext(ctx, t.getPeer()) 541 headerFields, err := t.createHeaderFields(ctx, callHdr) 542 if err != nil { 543 return nil, err 544 } 545 s := t.newStream(ctx, callHdr) 546 cleanup := func(err error) { 547 if s.swapState(streamDone) == streamDone { 548 // If it was already done, return. 549 return 550 } 551 // The stream was unprocessed by the server. 552 atomic.StoreUint32(&s.unprocessed, 1) 553 s.write(recvMsg{err: err}) 554 close(s.done) 555 // If headerChan isn't closed, then close it. 556 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 557 close(s.headerChan) 558 } 559 560 } 561 hdr := &headerFrame{ 562 hf: headerFields, 563 endStream: false, 564 initStream: func(id uint32) (bool, error) { 565 t.mu.Lock() 566 if state := t.state; state != reachable { 567 t.mu.Unlock() 568 // Do a quick cleanup. 569 err := error(errStreamDrain) 570 if state == closing { 571 err = ErrConnClosing 572 } 573 cleanup(err) 574 return false, err 575 } 576 t.activeStreams[id] = s 577 if channelz.IsOn() { 578 atomic.AddInt64(&t.czData.streamsStarted, 1) 579 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) 580 } 581 var sendPing bool 582 // If the number of active streams change from 0 to 1, then check if keepalive 583 // has gone dormant. If so, wake it up. 584 if len(t.activeStreams) == 1 && t.keepaliveEnabled { 585 select { 586 case t.awakenKeepalive <- struct{}{}: 587 sendPing = true 588 // Fill the awakenKeepalive channel again as this channel must be 589 // kept non-writable except at the point that the keepalive() 590 // goroutine is waiting either to be awaken or shutdown. 591 t.awakenKeepalive <- struct{}{} 592 default: 593 } 594 } 595 t.mu.Unlock() 596 return sendPing, nil 597 }, 598 onOrphaned: cleanup, 599 wq: s.wq, 600 } 601 firstTry := true 602 var ch chan struct{} 603 checkForStreamQuota := func(it interface{}) bool { 604 if t.streamQuota <= 0 { // Can go negative if server decreases it. 605 if firstTry { 606 t.waitingStreams++ 607 } 608 ch = t.streamsQuotaAvailable 609 return false 610 } 611 if !firstTry { 612 t.waitingStreams-- 613 } 614 t.streamQuota-- 615 h := it.(*headerFrame) 616 h.streamID = t.nextID 617 t.nextID += 2 618 s.id = h.streamID 619 s.fc = &inFlow{limit: uint32(t.initialWindowSize)} 620 if t.streamQuota > 0 && t.waitingStreams > 0 { 621 select { 622 case t.streamsQuotaAvailable <- struct{}{}: 623 default: 624 } 625 } 626 return true 627 } 628 var hdrListSizeErr error 629 checkForHeaderListSize := func(it interface{}) bool { 630 if t.maxSendHeaderListSize == nil { 631 return true 632 } 633 hdrFrame := it.(*headerFrame) 634 var sz int64 635 for _, f := range hdrFrame.hf { 636 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { 637 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) 638 return false 639 } 640 } 641 return true 642 } 643 for { 644 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { 645 if !checkForStreamQuota(it) { 646 return false 647 } 648 if !checkForHeaderListSize(it) { 649 return false 650 } 651 return true 652 }, hdr) 653 if err != nil { 654 return nil, err 655 } 656 if success { 657 break 658 } 659 if hdrListSizeErr != nil { 660 return nil, hdrListSizeErr 661 } 662 firstTry = false 663 select { 664 case <-ch: 665 case <-s.ctx.Done(): 666 return nil, ContextErr(s.ctx.Err()) 667 case <-t.goAway: 668 return nil, errStreamDrain 669 case <-t.ctx.Done(): 670 return nil, ErrConnClosing 671 } 672 } 673 if t.statsHandler != nil { 674 outHeader := &stats.OutHeader{ 675 Client: true, 676 FullMethod: callHdr.Method, 677 RemoteAddr: t.remoteAddr, 678 LocalAddr: t.localAddr, 679 Compression: callHdr.SendCompress, 680 } 681 t.statsHandler.HandleRPC(s.ctx, outHeader) 682 } 683 return s, nil 684} 685 686// CloseStream clears the footprint of a stream when the stream is not needed any more. 687// This must not be executed in reader's goroutine. 688func (t *http2Client) CloseStream(s *Stream, err error) { 689 var ( 690 rst bool 691 rstCode http2.ErrCode 692 ) 693 if err != nil { 694 rst = true 695 rstCode = http2.ErrCodeCancel 696 } 697 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false) 698} 699 700func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { 701 // Set stream status to done. 702 if s.swapState(streamDone) == streamDone { 703 // If it was already done, return. If multiple closeStream calls 704 // happen simultaneously, wait for the first to finish. 705 <-s.done 706 return 707 } 708 // status and trailers can be updated here without any synchronization because the stream goroutine will 709 // only read it after it sees an io.EOF error from read or write and we'll write those errors 710 // only after updating this. 711 s.status = st 712 if len(mdata) > 0 { 713 s.trailer = mdata 714 } 715 if err != nil { 716 // This will unblock reads eventually. 717 s.write(recvMsg{err: err}) 718 } 719 // If headerChan isn't closed, then close it. 720 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 721 s.noHeaders = true 722 close(s.headerChan) 723 } 724 cleanup := &cleanupStream{ 725 streamID: s.id, 726 onWrite: func() { 727 t.mu.Lock() 728 if t.activeStreams != nil { 729 delete(t.activeStreams, s.id) 730 } 731 t.mu.Unlock() 732 if channelz.IsOn() { 733 if eosReceived { 734 atomic.AddInt64(&t.czData.streamsSucceeded, 1) 735 } else { 736 atomic.AddInt64(&t.czData.streamsFailed, 1) 737 } 738 } 739 }, 740 rst: rst, 741 rstCode: rstCode, 742 } 743 addBackStreamQuota := func(interface{}) bool { 744 t.streamQuota++ 745 if t.streamQuota > 0 && t.waitingStreams > 0 { 746 select { 747 case t.streamsQuotaAvailable <- struct{}{}: 748 default: 749 } 750 } 751 return true 752 } 753 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) 754 // This will unblock write. 755 close(s.done) 756} 757 758// Close kicks off the shutdown process of the transport. This should be called 759// only once on a transport. Once it is called, the transport should not be 760// accessed any more. 761// 762// This method blocks until the addrConn that initiated this transport is 763// re-connected. This happens because t.onClose() begins reconnect logic at the 764// addrConn level and blocks until the addrConn is successfully connected. 765func (t *http2Client) Close() error { 766 t.mu.Lock() 767 // Make sure we only Close once. 768 if t.state == closing { 769 t.mu.Unlock() 770 return nil 771 } 772 t.state = closing 773 streams := t.activeStreams 774 t.activeStreams = nil 775 t.mu.Unlock() 776 t.controlBuf.finish() 777 t.cancel() 778 err := t.conn.Close() 779 if channelz.IsOn() { 780 channelz.RemoveEntry(t.channelzID) 781 } 782 // Notify all active streams. 783 for _, s := range streams { 784 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) 785 } 786 if t.statsHandler != nil { 787 connEnd := &stats.ConnEnd{ 788 Client: true, 789 } 790 t.statsHandler.HandleConn(t.ctx, connEnd) 791 } 792 t.onClose() 793 return err 794} 795 796// GracefulClose sets the state to draining, which prevents new streams from 797// being created and causes the transport to be closed when the last active 798// stream is closed. If there are no active streams, the transport is closed 799// immediately. This does nothing if the transport is already draining or 800// closing. 801func (t *http2Client) GracefulClose() { 802 t.mu.Lock() 803 // Make sure we move to draining only from active. 804 if t.state == draining || t.state == closing { 805 t.mu.Unlock() 806 return 807 } 808 t.state = draining 809 active := len(t.activeStreams) 810 t.mu.Unlock() 811 if active == 0 { 812 t.Close() 813 return 814 } 815 t.controlBuf.put(&incomingGoAway{}) 816} 817 818// Write formats the data into HTTP2 data frame(s) and sends it out. The caller 819// should proceed only if Write returns nil. 820func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 821 if opts.Last { 822 // If it's the last message, update stream state. 823 if !s.compareAndSwapState(streamActive, streamWriteDone) { 824 return errStreamDone 825 } 826 } else if s.getState() != streamActive { 827 return errStreamDone 828 } 829 df := &dataFrame{ 830 streamID: s.id, 831 endStream: opts.Last, 832 } 833 if hdr != nil || data != nil { // If it's not an empty data frame. 834 // Add some data to grpc message header so that we can equally 835 // distribute bytes across frames. 836 emptyLen := http2MaxFrameLen - len(hdr) 837 if emptyLen > len(data) { 838 emptyLen = len(data) 839 } 840 hdr = append(hdr, data[:emptyLen]...) 841 data = data[emptyLen:] 842 df.h, df.d = hdr, data 843 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. 844 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 845 return err 846 } 847 } 848 return t.controlBuf.put(df) 849} 850 851func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { 852 t.mu.Lock() 853 defer t.mu.Unlock() 854 s, ok := t.activeStreams[f.Header().StreamID] 855 return s, ok 856} 857 858// adjustWindow sends out extra window update over the initial window size 859// of stream if the application is requesting data larger in size than 860// the window. 861func (t *http2Client) adjustWindow(s *Stream, n uint32) { 862 if w := s.fc.maybeAdjust(n); w > 0 { 863 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 864 } 865} 866 867// updateWindow adjusts the inbound quota for the stream. 868// Window updates will be sent out when the cumulative quota 869// exceeds the corresponding threshold. 870func (t *http2Client) updateWindow(s *Stream, n uint32) { 871 if w := s.fc.onRead(n); w > 0 { 872 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 873 } 874} 875 876// updateFlowControl updates the incoming flow control windows 877// for the transport and the stream based on the current bdp 878// estimation. 879func (t *http2Client) updateFlowControl(n uint32) { 880 t.mu.Lock() 881 for _, s := range t.activeStreams { 882 s.fc.newLimit(n) 883 } 884 t.mu.Unlock() 885 updateIWS := func(interface{}) bool { 886 t.initialWindowSize = int32(n) 887 return true 888 } 889 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) 890 t.controlBuf.put(&outgoingSettings{ 891 ss: []http2.Setting{ 892 { 893 ID: http2.SettingInitialWindowSize, 894 Val: n, 895 }, 896 }, 897 }) 898} 899 900func (t *http2Client) handleData(f *http2.DataFrame) { 901 size := f.Header().Length 902 var sendBDPPing bool 903 if t.bdpEst != nil { 904 sendBDPPing = t.bdpEst.add(size) 905 } 906 // Decouple connection's flow control from application's read. 907 // An update on connection's flow control should not depend on 908 // whether user application has read the data or not. Such a 909 // restriction is already imposed on the stream's flow control, 910 // and therefore the sender will be blocked anyways. 911 // Decoupling the connection flow control will prevent other 912 // active(fast) streams from starving in presence of slow or 913 // inactive streams. 914 // 915 if w := t.fc.onData(size); w > 0 { 916 t.controlBuf.put(&outgoingWindowUpdate{ 917 streamID: 0, 918 increment: w, 919 }) 920 } 921 if sendBDPPing { 922 // Avoid excessive ping detection (e.g. in an L7 proxy) 923 // by sending a window update prior to the BDP ping. 924 925 if w := t.fc.reset(); w > 0 { 926 t.controlBuf.put(&outgoingWindowUpdate{ 927 streamID: 0, 928 increment: w, 929 }) 930 } 931 932 t.controlBuf.put(bdpPing) 933 } 934 // Select the right stream to dispatch. 935 s, ok := t.getStream(f) 936 if !ok { 937 return 938 } 939 if size > 0 { 940 if err := s.fc.onData(size); err != nil { 941 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) 942 return 943 } 944 if f.Header().Flags.Has(http2.FlagDataPadded) { 945 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 946 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 947 } 948 } 949 // TODO(bradfitz, zhaoq): A copy is required here because there is no 950 // guarantee f.Data() is consumed before the arrival of next frame. 951 // Can this copy be eliminated? 952 if len(f.Data()) > 0 { 953 buffer := t.bufferPool.get() 954 buffer.Reset() 955 buffer.Write(f.Data()) 956 s.write(recvMsg{buffer: buffer}) 957 } 958 } 959 // The server has closed the stream without sending trailers. Record that 960 // the read direction is closed, and set the status appropriately. 961 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { 962 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) 963 } 964} 965 966func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { 967 s, ok := t.getStream(f) 968 if !ok { 969 return 970 } 971 if f.ErrCode == http2.ErrCodeRefusedStream { 972 // The stream was unprocessed by the server. 973 atomic.StoreUint32(&s.unprocessed, 1) 974 } 975 statusCode, ok := http2ErrConvTab[f.ErrCode] 976 if !ok { 977 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) 978 statusCode = codes.Unknown 979 } 980 if statusCode == codes.Canceled { 981 // Our deadline was already exceeded, and that was likely the cause of 982 // this cancelation. Alter the status code accordingly. 983 if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) { 984 statusCode = codes.DeadlineExceeded 985 } 986 } 987 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) 988} 989 990func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { 991 if f.IsAck() { 992 return 993 } 994 var maxStreams *uint32 995 var ss []http2.Setting 996 var updateFuncs []func() 997 f.ForeachSetting(func(s http2.Setting) error { 998 switch s.ID { 999 case http2.SettingMaxConcurrentStreams: 1000 maxStreams = new(uint32) 1001 *maxStreams = s.Val 1002 case http2.SettingMaxHeaderListSize: 1003 updateFuncs = append(updateFuncs, func() { 1004 t.maxSendHeaderListSize = new(uint32) 1005 *t.maxSendHeaderListSize = s.Val 1006 }) 1007 default: 1008 ss = append(ss, s) 1009 } 1010 return nil 1011 }) 1012 if isFirst && maxStreams == nil { 1013 maxStreams = new(uint32) 1014 *maxStreams = math.MaxUint32 1015 } 1016 sf := &incomingSettings{ 1017 ss: ss, 1018 } 1019 if maxStreams != nil { 1020 updateStreamQuota := func() { 1021 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams) 1022 t.maxConcurrentStreams = *maxStreams 1023 t.streamQuota += delta 1024 if delta > 0 && t.waitingStreams > 0 { 1025 close(t.streamsQuotaAvailable) // wake all of them up. 1026 t.streamsQuotaAvailable = make(chan struct{}, 1) 1027 } 1028 } 1029 updateFuncs = append(updateFuncs, updateStreamQuota) 1030 } 1031 t.controlBuf.executeAndPut(func(interface{}) bool { 1032 for _, f := range updateFuncs { 1033 f() 1034 } 1035 return true 1036 }, sf) 1037} 1038 1039func (t *http2Client) handlePing(f *http2.PingFrame) { 1040 if f.IsAck() { 1041 // Maybe it's a BDP ping. 1042 if t.bdpEst != nil { 1043 t.bdpEst.calculate(f.Data) 1044 } 1045 return 1046 } 1047 pingAck := &ping{ack: true} 1048 copy(pingAck.data[:], f.Data[:]) 1049 t.controlBuf.put(pingAck) 1050} 1051 1052func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { 1053 t.mu.Lock() 1054 if t.state == closing { 1055 t.mu.Unlock() 1056 return 1057 } 1058 if f.ErrCode == http2.ErrCodeEnhanceYourCalm { 1059 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") 1060 } 1061 id := f.LastStreamID 1062 if id > 0 && id%2 != 1 { 1063 t.mu.Unlock() 1064 t.Close() 1065 return 1066 } 1067 // A client can receive multiple GoAways from the server (see 1068 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first 1069 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be 1070 // sent after an RTT delay with the ID of the last stream the server will 1071 // process. 1072 // 1073 // Therefore, when we get the first GoAway we don't necessarily close any 1074 // streams. While in case of second GoAway we close all streams created after 1075 // the GoAwayId. This way streams that were in-flight while the GoAway from 1076 // server was being sent don't get killed. 1077 select { 1078 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). 1079 // If there are multiple GoAways the first one should always have an ID greater than the following ones. 1080 if id > t.prevGoAwayID { 1081 t.mu.Unlock() 1082 t.Close() 1083 return 1084 } 1085 default: 1086 t.setGoAwayReason(f) 1087 close(t.goAway) 1088 t.state = draining 1089 t.controlBuf.put(&incomingGoAway{}) 1090 1091 // This has to be a new goroutine because we're still using the current goroutine to read in the transport. 1092 t.onGoAway(t.goAwayReason) 1093 } 1094 // All streams with IDs greater than the GoAwayId 1095 // and smaller than the previous GoAway ID should be killed. 1096 upperLimit := t.prevGoAwayID 1097 if upperLimit == 0 { // This is the first GoAway Frame. 1098 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. 1099 } 1100 for streamID, stream := range t.activeStreams { 1101 if streamID > id && streamID <= upperLimit { 1102 // The stream was unprocessed by the server. 1103 atomic.StoreUint32(&stream.unprocessed, 1) 1104 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) 1105 } 1106 } 1107 t.prevGoAwayID = id 1108 active := len(t.activeStreams) 1109 t.mu.Unlock() 1110 if active == 0 { 1111 t.Close() 1112 } 1113} 1114 1115// setGoAwayReason sets the value of t.goAwayReason based 1116// on the GoAway frame received. 1117// It expects a lock on transport's mutext to be held by 1118// the caller. 1119func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { 1120 t.goAwayReason = GoAwayNoReason 1121 switch f.ErrCode { 1122 case http2.ErrCodeEnhanceYourCalm: 1123 if string(f.DebugData()) == "too_many_pings" { 1124 t.goAwayReason = GoAwayTooManyPings 1125 } 1126 } 1127} 1128 1129func (t *http2Client) GetGoAwayReason() GoAwayReason { 1130 t.mu.Lock() 1131 defer t.mu.Unlock() 1132 return t.goAwayReason 1133} 1134 1135func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { 1136 t.controlBuf.put(&incomingWindowUpdate{ 1137 streamID: f.Header().StreamID, 1138 increment: f.Increment, 1139 }) 1140} 1141 1142// operateHeaders takes action on the decoded headers. 1143func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { 1144 s, ok := t.getStream(frame) 1145 if !ok { 1146 return 1147 } 1148 endStream := frame.StreamEnded() 1149 atomic.StoreUint32(&s.bytesReceived, 1) 1150 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0 1151 1152 if !initialHeader && !endStream { 1153 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set. 1154 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream") 1155 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false) 1156 return 1157 } 1158 1159 state := &decodeState{} 1160 // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. 1161 state.data.isGRPC = !initialHeader 1162 if err := state.decodeHeader(frame); err != nil { 1163 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) 1164 return 1165 } 1166 1167 isHeader := false 1168 defer func() { 1169 if t.statsHandler != nil { 1170 if isHeader { 1171 inHeader := &stats.InHeader{ 1172 Client: true, 1173 WireLength: int(frame.Header().Length), 1174 } 1175 t.statsHandler.HandleRPC(s.ctx, inHeader) 1176 } else { 1177 inTrailer := &stats.InTrailer{ 1178 Client: true, 1179 WireLength: int(frame.Header().Length), 1180 } 1181 t.statsHandler.HandleRPC(s.ctx, inTrailer) 1182 } 1183 } 1184 }() 1185 1186 // If headerChan hasn't been closed yet 1187 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { 1188 if !endStream { 1189 // HEADERS frame block carries a Response-Headers. 1190 isHeader = true 1191 // These values can be set without any synchronization because 1192 // stream goroutine will read it only after seeing a closed 1193 // headerChan which we'll close after setting this. 1194 s.recvCompress = state.data.encoding 1195 if len(state.data.mdata) > 0 { 1196 s.header = state.data.mdata 1197 } 1198 } else { 1199 // HEADERS frame block carries a Trailers-Only. 1200 s.noHeaders = true 1201 } 1202 close(s.headerChan) 1203 } 1204 1205 if !endStream { 1206 return 1207 } 1208 1209 // if client received END_STREAM from server while stream was still active, send RST_STREAM 1210 rst := s.getState() == streamActive 1211 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) 1212} 1213 1214// reader runs as a separate goroutine in charge of reading data from network 1215// connection. 1216// 1217// TODO(zhaoq): currently one reader per transport. Investigate whether this is 1218// optimal. 1219// TODO(zhaoq): Check the validity of the incoming frame sequence. 1220func (t *http2Client) reader() { 1221 defer close(t.readerDone) 1222 // Check the validity of server preface. 1223 frame, err := t.framer.fr.ReadFrame() 1224 if err != nil { 1225 t.Close() // this kicks off resetTransport, so must be last before return 1226 return 1227 } 1228 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) 1229 if t.keepaliveEnabled { 1230 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1231 } 1232 sf, ok := frame.(*http2.SettingsFrame) 1233 if !ok { 1234 t.Close() // this kicks off resetTransport, so must be last before return 1235 return 1236 } 1237 t.onPrefaceReceipt() 1238 t.handleSettings(sf, true) 1239 1240 // loop to keep reading incoming messages on this transport. 1241 for { 1242 frame, err := t.framer.fr.ReadFrame() 1243 if t.keepaliveEnabled { 1244 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1245 } 1246 if err != nil { 1247 // Abort an active stream if the http2.Framer returns a 1248 // http2.StreamError. This can happen only if the server's response 1249 // is malformed http2. 1250 if se, ok := err.(http2.StreamError); ok { 1251 t.mu.Lock() 1252 s := t.activeStreams[se.StreamID] 1253 t.mu.Unlock() 1254 if s != nil { 1255 // use error detail to provide better err message 1256 code := http2ErrConvTab[se.Code] 1257 msg := t.framer.fr.ErrorDetail().Error() 1258 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) 1259 } 1260 continue 1261 } else { 1262 // Transport error. 1263 t.Close() 1264 return 1265 } 1266 } 1267 switch frame := frame.(type) { 1268 case *http2.MetaHeadersFrame: 1269 t.operateHeaders(frame) 1270 case *http2.DataFrame: 1271 t.handleData(frame) 1272 case *http2.RSTStreamFrame: 1273 t.handleRSTStream(frame) 1274 case *http2.SettingsFrame: 1275 t.handleSettings(frame, false) 1276 case *http2.PingFrame: 1277 t.handlePing(frame) 1278 case *http2.GoAwayFrame: 1279 t.handleGoAway(frame) 1280 case *http2.WindowUpdateFrame: 1281 t.handleWindowUpdate(frame) 1282 default: 1283 errorf("transport: http2Client.reader got unhandled frame type %v.", frame) 1284 } 1285 } 1286} 1287 1288// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. 1289func (t *http2Client) keepalive() { 1290 p := &ping{data: [8]byte{}} 1291 timer := time.NewTimer(t.kp.Time) 1292 for { 1293 select { 1294 case <-timer.C: 1295 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1296 timer.Reset(t.kp.Time) 1297 continue 1298 } 1299 // Check if keepalive should go dormant. 1300 t.mu.Lock() 1301 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { 1302 // Make awakenKeepalive writable. 1303 <-t.awakenKeepalive 1304 t.mu.Unlock() 1305 select { 1306 case <-t.awakenKeepalive: 1307 // If the control gets here a ping has been sent 1308 // need to reset the timer with keepalive.Timeout. 1309 case <-t.ctx.Done(): 1310 return 1311 } 1312 } else { 1313 t.mu.Unlock() 1314 if channelz.IsOn() { 1315 atomic.AddInt64(&t.czData.kpCount, 1) 1316 } 1317 // Send ping. 1318 t.controlBuf.put(p) 1319 } 1320 1321 // By the time control gets here a ping has been sent one way or the other. 1322 timer.Reset(t.kp.Timeout) 1323 select { 1324 case <-timer.C: 1325 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1326 timer.Reset(t.kp.Time) 1327 continue 1328 } 1329 t.Close() 1330 return 1331 case <-t.ctx.Done(): 1332 if !timer.Stop() { 1333 <-timer.C 1334 } 1335 return 1336 } 1337 case <-t.ctx.Done(): 1338 if !timer.Stop() { 1339 <-timer.C 1340 } 1341 return 1342 } 1343 } 1344} 1345 1346func (t *http2Client) Error() <-chan struct{} { 1347 return t.ctx.Done() 1348} 1349 1350func (t *http2Client) GoAway() <-chan struct{} { 1351 return t.goAway 1352} 1353 1354func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { 1355 s := channelz.SocketInternalMetric{ 1356 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), 1357 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), 1358 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), 1359 MessagesSent: atomic.LoadInt64(&t.czData.msgSent), 1360 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), 1361 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), 1362 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), 1363 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), 1364 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), 1365 LocalFlowControlWindow: int64(t.fc.getSize()), 1366 SocketOptions: channelz.GetSocketOption(t.conn), 1367 LocalAddr: t.localAddr, 1368 RemoteAddr: t.remoteAddr, 1369 // RemoteName : 1370 } 1371 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1372 s.Security = au.GetSecurityValue() 1373 } 1374 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1375 return &s 1376} 1377 1378func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr } 1379 1380func (t *http2Client) IncrMsgSent() { 1381 atomic.AddInt64(&t.czData.msgSent, 1) 1382 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) 1383} 1384 1385func (t *http2Client) IncrMsgRecv() { 1386 atomic.AddInt64(&t.czData.msgRecv, 1) 1387 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) 1388} 1389 1390func (t *http2Client) getOutFlowWindow() int64 { 1391 resp := make(chan uint32, 1) 1392 timer := time.NewTimer(time.Second) 1393 defer timer.Stop() 1394 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1395 select { 1396 case sz := <-resp: 1397 return int64(sz) 1398 case <-t.ctxDone: 1399 return -1 1400 case <-timer.C: 1401 return -2 1402 } 1403} 1404