1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "context" 24 "errors" 25 "fmt" 26 "io" 27 "math" 28 "net" 29 "strconv" 30 "sync" 31 "sync/atomic" 32 "time" 33 34 "github.com/golang/protobuf/proto" 35 "golang.org/x/net/http2" 36 "golang.org/x/net/http2/hpack" 37 38 spb "google.golang.org/genproto/googleapis/rpc/status" 39 "google.golang.org/grpc/codes" 40 "google.golang.org/grpc/credentials" 41 "google.golang.org/grpc/grpclog" 42 "google.golang.org/grpc/internal" 43 "google.golang.org/grpc/internal/channelz" 44 "google.golang.org/grpc/internal/grpcrand" 45 "google.golang.org/grpc/keepalive" 46 "google.golang.org/grpc/metadata" 47 "google.golang.org/grpc/peer" 48 "google.golang.org/grpc/stats" 49 "google.golang.org/grpc/status" 50 "google.golang.org/grpc/tap" 51) 52 53var ( 54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of 55 // the stream's state. 56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") 57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger 58 // than the limit set by peer. 59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") 60 // statusRawProto is a function to get to the raw status proto wrapped in a 61 // status.Status without a proto.Clone(). 62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status) 63) 64 65// http2Server implements the ServerTransport interface with HTTP2. 66type http2Server struct { 67 ctx context.Context 68 done chan struct{} 69 conn net.Conn 70 loopy *loopyWriter 71 readerDone chan struct{} // sync point to enable testing. 72 writerDone chan struct{} // sync point to enable testing. 73 remoteAddr net.Addr 74 localAddr net.Addr 75 maxStreamID uint32 // max stream ID ever seen 76 authInfo credentials.AuthInfo // auth info about the connection 77 inTapHandle tap.ServerInHandle 78 framer *framer 79 // The max number of concurrent streams. 80 maxStreams uint32 81 // controlBuf delivers all the control related tasks (e.g., window 82 // updates, reset streams, and various settings) to the controller. 83 controlBuf *controlBuffer 84 fc *trInFlow 85 stats stats.Handler 86 // Flag to keep track of reading activity on transport. 87 // 1 is true and 0 is false. 88 activity uint32 // Accessed atomically. 89 // Keepalive and max-age parameters for the server. 90 kp keepalive.ServerParameters 91 92 // Keepalive enforcement policy. 93 kep keepalive.EnforcementPolicy 94 // The time instance last ping was received. 95 lastPingAt time.Time 96 // Number of times the client has violated keepalive ping policy so far. 97 pingStrikes uint8 98 // Flag to signify that number of ping strikes should be reset to 0. 99 // This is set whenever data or header frames are sent. 100 // 1 means yes. 101 resetPingStrikes uint32 // Accessed atomically. 102 initialWindowSize int32 103 bdpEst *bdpEstimator 104 maxSendHeaderListSize *uint32 105 106 mu sync.Mutex // guard the following 107 108 // drainChan is initialized when drain(...) is called the first time. 109 // After which the server writes out the first GoAway(with ID 2^31-1) frame. 110 // Then an independent goroutine will be launched to later send the second GoAway. 111 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. 112 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is 113 // already underway. 114 drainChan chan struct{} 115 state transportState 116 activeStreams map[uint32]*Stream 117 // idle is the time instant when the connection went idle. 118 // This is either the beginning of the connection or when the number of 119 // RPCs go down to 0. 120 // When the connection is busy, this value is set to 0. 121 idle time.Time 122 123 // Fields below are for channelz metric collection. 124 channelzID int64 // channelz unique identification number 125 czData *channelzData 126 bufferPool *bufferPool 127} 128 129// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is 130// returned if something goes wrong. 131func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { 132 writeBufSize := config.WriteBufferSize 133 readBufSize := config.ReadBufferSize 134 maxHeaderListSize := defaultServerMaxHeaderListSize 135 if config.MaxHeaderListSize != nil { 136 maxHeaderListSize = *config.MaxHeaderListSize 137 } 138 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) 139 // Send initial settings as connection preface to client. 140 isettings := []http2.Setting{{ 141 ID: http2.SettingMaxFrameSize, 142 Val: http2MaxFrameLen, 143 }} 144 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is 145 // permitted in the HTTP2 spec. 146 maxStreams := config.MaxStreams 147 if maxStreams == 0 { 148 maxStreams = math.MaxUint32 149 } else { 150 isettings = append(isettings, http2.Setting{ 151 ID: http2.SettingMaxConcurrentStreams, 152 Val: maxStreams, 153 }) 154 } 155 dynamicWindow := true 156 iwz := int32(initialWindowSize) 157 if config.InitialWindowSize >= defaultWindowSize { 158 iwz = config.InitialWindowSize 159 dynamicWindow = false 160 } 161 icwz := int32(initialWindowSize) 162 if config.InitialConnWindowSize >= defaultWindowSize { 163 icwz = config.InitialConnWindowSize 164 dynamicWindow = false 165 } 166 if iwz != defaultWindowSize { 167 isettings = append(isettings, http2.Setting{ 168 ID: http2.SettingInitialWindowSize, 169 Val: uint32(iwz)}) 170 } 171 if config.MaxHeaderListSize != nil { 172 isettings = append(isettings, http2.Setting{ 173 ID: http2.SettingMaxHeaderListSize, 174 Val: *config.MaxHeaderListSize, 175 }) 176 } 177 if config.HeaderTableSize != nil { 178 isettings = append(isettings, http2.Setting{ 179 ID: http2.SettingHeaderTableSize, 180 Val: *config.HeaderTableSize, 181 }) 182 } 183 if err := framer.fr.WriteSettings(isettings...); err != nil { 184 return nil, connectionErrorf(false, err, "transport: %v", err) 185 } 186 // Adjust the connection flow control window if needed. 187 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 188 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { 189 return nil, connectionErrorf(false, err, "transport: %v", err) 190 } 191 } 192 kp := config.KeepaliveParams 193 if kp.MaxConnectionIdle == 0 { 194 kp.MaxConnectionIdle = defaultMaxConnectionIdle 195 } 196 if kp.MaxConnectionAge == 0 { 197 kp.MaxConnectionAge = defaultMaxConnectionAge 198 } 199 // Add a jitter to MaxConnectionAge. 200 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) 201 if kp.MaxConnectionAgeGrace == 0 { 202 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace 203 } 204 if kp.Time == 0 { 205 kp.Time = defaultServerKeepaliveTime 206 } 207 if kp.Timeout == 0 { 208 kp.Timeout = defaultServerKeepaliveTimeout 209 } 210 kep := config.KeepalivePolicy 211 if kep.MinTime == 0 { 212 kep.MinTime = defaultKeepalivePolicyMinTime 213 } 214 done := make(chan struct{}) 215 t := &http2Server{ 216 ctx: context.Background(), 217 done: done, 218 conn: conn, 219 remoteAddr: conn.RemoteAddr(), 220 localAddr: conn.LocalAddr(), 221 authInfo: config.AuthInfo, 222 framer: framer, 223 readerDone: make(chan struct{}), 224 writerDone: make(chan struct{}), 225 maxStreams: maxStreams, 226 inTapHandle: config.InTapHandle, 227 fc: &trInFlow{limit: uint32(icwz)}, 228 state: reachable, 229 activeStreams: make(map[uint32]*Stream), 230 stats: config.StatsHandler, 231 kp: kp, 232 idle: time.Now(), 233 kep: kep, 234 initialWindowSize: iwz, 235 czData: new(channelzData), 236 bufferPool: newBufferPool(), 237 } 238 t.controlBuf = newControlBuffer(t.done) 239 if dynamicWindow { 240 t.bdpEst = &bdpEstimator{ 241 bdp: initialWindowSize, 242 updateFlowControl: t.updateFlowControl, 243 } 244 } 245 if t.stats != nil { 246 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ 247 RemoteAddr: t.remoteAddr, 248 LocalAddr: t.localAddr, 249 }) 250 connBegin := &stats.ConnBegin{} 251 t.stats.HandleConn(t.ctx, connBegin) 252 } 253 if channelz.IsOn() { 254 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) 255 } 256 t.framer.writer.Flush() 257 258 defer func() { 259 if err != nil { 260 t.Close() 261 } 262 }() 263 264 // Check the validity of client preface. 265 preface := make([]byte, len(clientPreface)) 266 if _, err := io.ReadFull(t.conn, preface); err != nil { 267 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) 268 } 269 if !bytes.Equal(preface, clientPreface) { 270 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) 271 } 272 273 frame, err := t.framer.fr.ReadFrame() 274 if err == io.EOF || err == io.ErrUnexpectedEOF { 275 return nil, err 276 } 277 if err != nil { 278 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) 279 } 280 atomic.StoreUint32(&t.activity, 1) 281 sf, ok := frame.(*http2.SettingsFrame) 282 if !ok { 283 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) 284 } 285 t.handleSettings(sf) 286 287 go func() { 288 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) 289 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler 290 if err := t.loopy.run(); err != nil { 291 errorf("transport: loopyWriter.run returning. Err: %v", err) 292 } 293 t.conn.Close() 294 close(t.writerDone) 295 }() 296 go t.keepalive() 297 return t, nil 298} 299 300// operateHeader takes action on the decoded headers. 301func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { 302 streamID := frame.Header().StreamID 303 state := &decodeState{ 304 serverSide: true, 305 } 306 if err := state.decodeHeader(frame); err != nil { 307 if se, ok := status.FromError(err); ok { 308 t.controlBuf.put(&cleanupStream{ 309 streamID: streamID, 310 rst: true, 311 rstCode: statusCodeConvTab[se.Code()], 312 onWrite: func() {}, 313 }) 314 } 315 return false 316 } 317 318 buf := newRecvBuffer() 319 s := &Stream{ 320 id: streamID, 321 st: t, 322 buf: buf, 323 fc: &inFlow{limit: uint32(t.initialWindowSize)}, 324 recvCompress: state.data.encoding, 325 method: state.data.method, 326 contentSubtype: state.data.contentSubtype, 327 } 328 if frame.StreamEnded() { 329 // s is just created by the caller. No lock needed. 330 s.state = streamReadDone 331 } 332 if state.data.timeoutSet { 333 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) 334 } else { 335 s.ctx, s.cancel = context.WithCancel(t.ctx) 336 } 337 pr := &peer.Peer{ 338 Addr: t.remoteAddr, 339 } 340 // Attach Auth info if there is any. 341 if t.authInfo != nil { 342 pr.AuthInfo = t.authInfo 343 } 344 s.ctx = peer.NewContext(s.ctx, pr) 345 // Attach the received metadata to the context. 346 if len(state.data.mdata) > 0 { 347 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata) 348 } 349 if state.data.statsTags != nil { 350 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags) 351 } 352 if state.data.statsTrace != nil { 353 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) 354 } 355 if t.inTapHandle != nil { 356 var err error 357 info := &tap.Info{ 358 FullMethodName: state.data.method, 359 } 360 s.ctx, err = t.inTapHandle(s.ctx, info) 361 if err != nil { 362 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) 363 t.controlBuf.put(&cleanupStream{ 364 streamID: s.id, 365 rst: true, 366 rstCode: http2.ErrCodeRefusedStream, 367 onWrite: func() {}, 368 }) 369 s.cancel() 370 return false 371 } 372 } 373 t.mu.Lock() 374 if t.state != reachable { 375 t.mu.Unlock() 376 s.cancel() 377 return false 378 } 379 if uint32(len(t.activeStreams)) >= t.maxStreams { 380 t.mu.Unlock() 381 t.controlBuf.put(&cleanupStream{ 382 streamID: streamID, 383 rst: true, 384 rstCode: http2.ErrCodeRefusedStream, 385 onWrite: func() {}, 386 }) 387 s.cancel() 388 return false 389 } 390 if streamID%2 != 1 || streamID <= t.maxStreamID { 391 t.mu.Unlock() 392 // illegal gRPC stream id. 393 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) 394 s.cancel() 395 return true 396 } 397 t.maxStreamID = streamID 398 t.activeStreams[streamID] = s 399 if len(t.activeStreams) == 1 { 400 t.idle = time.Time{} 401 } 402 t.mu.Unlock() 403 if channelz.IsOn() { 404 atomic.AddInt64(&t.czData.streamsStarted, 1) 405 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) 406 } 407 s.requestRead = func(n int) { 408 t.adjustWindow(s, uint32(n)) 409 } 410 s.ctx = traceCtx(s.ctx, s.method) 411 if t.stats != nil { 412 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) 413 inHeader := &stats.InHeader{ 414 FullMethod: s.method, 415 RemoteAddr: t.remoteAddr, 416 LocalAddr: t.localAddr, 417 Compression: s.recvCompress, 418 WireLength: int(frame.Header().Length), 419 } 420 t.stats.HandleRPC(s.ctx, inHeader) 421 } 422 s.ctxDone = s.ctx.Done() 423 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) 424 s.trReader = &transportReader{ 425 reader: &recvBufferReader{ 426 ctx: s.ctx, 427 ctxDone: s.ctxDone, 428 recv: s.buf, 429 freeBuffer: t.bufferPool.put, 430 }, 431 windowHandler: func(n int) { 432 t.updateWindow(s, uint32(n)) 433 }, 434 } 435 // Register the stream with loopy. 436 t.controlBuf.put(®isterStream{ 437 streamID: s.id, 438 wq: s.wq, 439 }) 440 handle(s) 441 return false 442} 443 444// HandleStreams receives incoming streams using the given handler. This is 445// typically run in a separate goroutine. 446// traceCtx attaches trace to ctx and returns the new context. 447func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { 448 defer close(t.readerDone) 449 for { 450 t.controlBuf.throttle() 451 frame, err := t.framer.fr.ReadFrame() 452 atomic.StoreUint32(&t.activity, 1) 453 if err != nil { 454 if se, ok := err.(http2.StreamError); ok { 455 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) 456 t.mu.Lock() 457 s := t.activeStreams[se.StreamID] 458 t.mu.Unlock() 459 if s != nil { 460 t.closeStream(s, true, se.Code, false) 461 } else { 462 t.controlBuf.put(&cleanupStream{ 463 streamID: se.StreamID, 464 rst: true, 465 rstCode: se.Code, 466 onWrite: func() {}, 467 }) 468 } 469 continue 470 } 471 if err == io.EOF || err == io.ErrUnexpectedEOF { 472 t.Close() 473 return 474 } 475 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) 476 t.Close() 477 return 478 } 479 switch frame := frame.(type) { 480 case *http2.MetaHeadersFrame: 481 if t.operateHeaders(frame, handle, traceCtx) { 482 t.Close() 483 break 484 } 485 case *http2.DataFrame: 486 t.handleData(frame) 487 case *http2.RSTStreamFrame: 488 t.handleRSTStream(frame) 489 case *http2.SettingsFrame: 490 t.handleSettings(frame) 491 case *http2.PingFrame: 492 t.handlePing(frame) 493 case *http2.WindowUpdateFrame: 494 t.handleWindowUpdate(frame) 495 case *http2.GoAwayFrame: 496 // TODO: Handle GoAway from the client appropriately. 497 default: 498 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) 499 } 500 } 501} 502 503func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { 504 t.mu.Lock() 505 defer t.mu.Unlock() 506 if t.activeStreams == nil { 507 // The transport is closing. 508 return nil, false 509 } 510 s, ok := t.activeStreams[f.Header().StreamID] 511 if !ok { 512 // The stream is already done. 513 return nil, false 514 } 515 return s, true 516} 517 518// adjustWindow sends out extra window update over the initial window size 519// of stream if the application is requesting data larger in size than 520// the window. 521func (t *http2Server) adjustWindow(s *Stream, n uint32) { 522 if w := s.fc.maybeAdjust(n); w > 0 { 523 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 524 } 525 526} 527 528// updateWindow adjusts the inbound quota for the stream and the transport. 529// Window updates will deliver to the controller for sending when 530// the cumulative quota exceeds the corresponding threshold. 531func (t *http2Server) updateWindow(s *Stream, n uint32) { 532 if w := s.fc.onRead(n); w > 0 { 533 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, 534 increment: w, 535 }) 536 } 537} 538 539// updateFlowControl updates the incoming flow control windows 540// for the transport and the stream based on the current bdp 541// estimation. 542func (t *http2Server) updateFlowControl(n uint32) { 543 t.mu.Lock() 544 for _, s := range t.activeStreams { 545 s.fc.newLimit(n) 546 } 547 t.initialWindowSize = int32(n) 548 t.mu.Unlock() 549 t.controlBuf.put(&outgoingWindowUpdate{ 550 streamID: 0, 551 increment: t.fc.newLimit(n), 552 }) 553 t.controlBuf.put(&outgoingSettings{ 554 ss: []http2.Setting{ 555 { 556 ID: http2.SettingInitialWindowSize, 557 Val: n, 558 }, 559 }, 560 }) 561 562} 563 564func (t *http2Server) handleData(f *http2.DataFrame) { 565 size := f.Header().Length 566 var sendBDPPing bool 567 if t.bdpEst != nil { 568 sendBDPPing = t.bdpEst.add(size) 569 } 570 // Decouple connection's flow control from application's read. 571 // An update on connection's flow control should not depend on 572 // whether user application has read the data or not. Such a 573 // restriction is already imposed on the stream's flow control, 574 // and therefore the sender will be blocked anyways. 575 // Decoupling the connection flow control will prevent other 576 // active(fast) streams from starving in presence of slow or 577 // inactive streams. 578 if w := t.fc.onData(size); w > 0 { 579 t.controlBuf.put(&outgoingWindowUpdate{ 580 streamID: 0, 581 increment: w, 582 }) 583 } 584 if sendBDPPing { 585 // Avoid excessive ping detection (e.g. in an L7 proxy) 586 // by sending a window update prior to the BDP ping. 587 if w := t.fc.reset(); w > 0 { 588 t.controlBuf.put(&outgoingWindowUpdate{ 589 streamID: 0, 590 increment: w, 591 }) 592 } 593 t.controlBuf.put(bdpPing) 594 } 595 // Select the right stream to dispatch. 596 s, ok := t.getStream(f) 597 if !ok { 598 return 599 } 600 if size > 0 { 601 if err := s.fc.onData(size); err != nil { 602 t.closeStream(s, true, http2.ErrCodeFlowControl, false) 603 return 604 } 605 if f.Header().Flags.Has(http2.FlagDataPadded) { 606 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 607 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 608 } 609 } 610 // TODO(bradfitz, zhaoq): A copy is required here because there is no 611 // guarantee f.Data() is consumed before the arrival of next frame. 612 // Can this copy be eliminated? 613 if len(f.Data()) > 0 { 614 buffer := t.bufferPool.get() 615 buffer.Reset() 616 buffer.Write(f.Data()) 617 s.write(recvMsg{buffer: buffer}) 618 } 619 } 620 if f.Header().Flags.Has(http2.FlagDataEndStream) { 621 // Received the end of stream from the client. 622 s.compareAndSwapState(streamActive, streamReadDone) 623 s.write(recvMsg{err: io.EOF}) 624 } 625} 626 627func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { 628 // If the stream is not deleted from the transport's active streams map, then do a regular close stream. 629 if s, ok := t.getStream(f); ok { 630 t.closeStream(s, false, 0, false) 631 return 632 } 633 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map. 634 t.controlBuf.put(&cleanupStream{ 635 streamID: f.Header().StreamID, 636 rst: false, 637 rstCode: 0, 638 onWrite: func() {}, 639 }) 640} 641 642func (t *http2Server) handleSettings(f *http2.SettingsFrame) { 643 if f.IsAck() { 644 return 645 } 646 var ss []http2.Setting 647 var updateFuncs []func() 648 f.ForeachSetting(func(s http2.Setting) error { 649 switch s.ID { 650 case http2.SettingMaxHeaderListSize: 651 updateFuncs = append(updateFuncs, func() { 652 t.maxSendHeaderListSize = new(uint32) 653 *t.maxSendHeaderListSize = s.Val 654 }) 655 default: 656 ss = append(ss, s) 657 } 658 return nil 659 }) 660 t.controlBuf.executeAndPut(func(interface{}) bool { 661 for _, f := range updateFuncs { 662 f() 663 } 664 return true 665 }, &incomingSettings{ 666 ss: ss, 667 }) 668} 669 670const ( 671 maxPingStrikes = 2 672 defaultPingTimeout = 2 * time.Hour 673) 674 675func (t *http2Server) handlePing(f *http2.PingFrame) { 676 if f.IsAck() { 677 if f.Data == goAwayPing.data && t.drainChan != nil { 678 close(t.drainChan) 679 return 680 } 681 // Maybe it's a BDP ping. 682 if t.bdpEst != nil { 683 t.bdpEst.calculate(f.Data) 684 } 685 return 686 } 687 pingAck := &ping{ack: true} 688 copy(pingAck.data[:], f.Data[:]) 689 t.controlBuf.put(pingAck) 690 691 now := time.Now() 692 defer func() { 693 t.lastPingAt = now 694 }() 695 // A reset ping strikes means that we don't need to check for policy 696 // violation for this ping and the pingStrikes counter should be set 697 // to 0. 698 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { 699 t.pingStrikes = 0 700 return 701 } 702 t.mu.Lock() 703 ns := len(t.activeStreams) 704 t.mu.Unlock() 705 if ns < 1 && !t.kep.PermitWithoutStream { 706 // Keepalive shouldn't be active thus, this new ping should 707 // have come after at least defaultPingTimeout. 708 if t.lastPingAt.Add(defaultPingTimeout).After(now) { 709 t.pingStrikes++ 710 } 711 } else { 712 // Check if keepalive policy is respected. 713 if t.lastPingAt.Add(t.kep.MinTime).After(now) { 714 t.pingStrikes++ 715 } 716 } 717 718 if t.pingStrikes > maxPingStrikes { 719 // Send goaway and close the connection. 720 errorf("transport: Got too many pings from the client, closing the connection.") 721 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) 722 } 723} 724 725func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { 726 t.controlBuf.put(&incomingWindowUpdate{ 727 streamID: f.Header().StreamID, 728 increment: f.Increment, 729 }) 730} 731 732func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { 733 for k, vv := range md { 734 if isReservedHeader(k) { 735 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 736 continue 737 } 738 for _, v := range vv { 739 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 740 } 741 } 742 return headerFields 743} 744 745func (t *http2Server) checkForHeaderListSize(it interface{}) bool { 746 if t.maxSendHeaderListSize == nil { 747 return true 748 } 749 hdrFrame := it.(*headerFrame) 750 var sz int64 751 for _, f := range hdrFrame.hf { 752 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { 753 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) 754 return false 755 } 756 } 757 return true 758} 759 760// WriteHeader sends the header metadata md back to the client. 761func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { 762 if s.updateHeaderSent() || s.getState() == streamDone { 763 return ErrIllegalHeaderWrite 764 } 765 s.hdrMu.Lock() 766 if md.Len() > 0 { 767 if s.header.Len() > 0 { 768 s.header = metadata.Join(s.header, md) 769 } else { 770 s.header = md 771 } 772 } 773 if err := t.writeHeaderLocked(s); err != nil { 774 s.hdrMu.Unlock() 775 return err 776 } 777 s.hdrMu.Unlock() 778 return nil 779} 780 781func (t *http2Server) setResetPingStrikes() { 782 atomic.StoreUint32(&t.resetPingStrikes, 1) 783} 784 785func (t *http2Server) writeHeaderLocked(s *Stream) error { 786 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 787 // first and create a slice of that exact size. 788 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. 789 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 790 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) 791 if s.sendCompress != "" { 792 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 793 } 794 headerFields = appendHeaderFieldsFromMD(headerFields, s.header) 795 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ 796 streamID: s.id, 797 hf: headerFields, 798 endStream: false, 799 onWrite: t.setResetPingStrikes, 800 }) 801 if !success { 802 if err != nil { 803 return err 804 } 805 t.closeStream(s, true, http2.ErrCodeInternal, false) 806 return ErrHeaderListSizeLimitViolation 807 } 808 if t.stats != nil { 809 // Note: WireLength is not set in outHeader. 810 // TODO(mmukhi): Revisit this later, if needed. 811 outHeader := &stats.OutHeader{} 812 t.stats.HandleRPC(s.Context(), outHeader) 813 } 814 return nil 815} 816 817// WriteStatus sends stream status to the client and terminates the stream. 818// There is no further I/O operations being able to perform on this stream. 819// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early 820// OK is adopted. 821func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { 822 if s.getState() == streamDone { 823 return nil 824 } 825 s.hdrMu.Lock() 826 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 827 // first and create a slice of that exact size. 828 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. 829 if !s.updateHeaderSent() { // No headers have been sent. 830 if len(s.header) > 0 { // Send a separate header frame. 831 if err := t.writeHeaderLocked(s); err != nil { 832 s.hdrMu.Unlock() 833 return err 834 } 835 } else { // Send a trailer only response. 836 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 837 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) 838 } 839 } 840 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) 841 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) 842 843 if p := statusRawProto(st); p != nil && len(p.Details) > 0 { 844 stBytes, err := proto.Marshal(p) 845 if err != nil { 846 // TODO: return error instead, when callers are able to handle it. 847 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) 848 } else { 849 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) 850 } 851 } 852 853 // Attach the trailer metadata. 854 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) 855 trailingHeader := &headerFrame{ 856 streamID: s.id, 857 hf: headerFields, 858 endStream: true, 859 onWrite: t.setResetPingStrikes, 860 } 861 s.hdrMu.Unlock() 862 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) 863 if !success { 864 if err != nil { 865 return err 866 } 867 t.closeStream(s, true, http2.ErrCodeInternal, false) 868 return ErrHeaderListSizeLimitViolation 869 } 870 // Send a RST_STREAM after the trailers if the client has not already half-closed. 871 rst := s.getState() == streamActive 872 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) 873 if t.stats != nil { 874 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) 875 } 876 return nil 877} 878 879// Write converts the data into HTTP2 data frame and sends it out. Non-nil error 880// is returns if it fails (e.g., framing error, transport error). 881func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 882 if !s.isHeaderSent() { // Headers haven't been written yet. 883 if err := t.WriteHeader(s, nil); err != nil { 884 if _, ok := err.(ConnectionError); ok { 885 return err 886 } 887 // TODO(mmukhi, dfawley): Make sure this is the right code to return. 888 return status.Errorf(codes.Internal, "transport: %v", err) 889 } 890 } else { 891 // Writing headers checks for this condition. 892 if s.getState() == streamDone { 893 // TODO(mmukhi, dfawley): Should the server write also return io.EOF? 894 s.cancel() 895 select { 896 case <-t.done: 897 return ErrConnClosing 898 default: 899 } 900 return ContextErr(s.ctx.Err()) 901 } 902 } 903 // Add some data to header frame so that we can equally distribute bytes across frames. 904 emptyLen := http2MaxFrameLen - len(hdr) 905 if emptyLen > len(data) { 906 emptyLen = len(data) 907 } 908 hdr = append(hdr, data[:emptyLen]...) 909 data = data[emptyLen:] 910 df := &dataFrame{ 911 streamID: s.id, 912 h: hdr, 913 d: data, 914 onEachWrite: t.setResetPingStrikes, 915 } 916 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 917 select { 918 case <-t.done: 919 return ErrConnClosing 920 default: 921 } 922 return ContextErr(s.ctx.Err()) 923 } 924 return t.controlBuf.put(df) 925} 926 927// keepalive running in a separate goroutine does the following: 928// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. 929// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. 930// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. 931// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection 932// after an additional duration of keepalive.Timeout. 933func (t *http2Server) keepalive() { 934 p := &ping{} 935 var pingSent bool 936 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) 937 maxAge := time.NewTimer(t.kp.MaxConnectionAge) 938 keepalive := time.NewTimer(t.kp.Time) 939 // NOTE: All exit paths of this function should reset their 940 // respective timers. A failure to do so will cause the 941 // following clean-up to deadlock and eventually leak. 942 defer func() { 943 if !maxIdle.Stop() { 944 <-maxIdle.C 945 } 946 if !maxAge.Stop() { 947 <-maxAge.C 948 } 949 if !keepalive.Stop() { 950 <-keepalive.C 951 } 952 }() 953 for { 954 select { 955 case <-maxIdle.C: 956 t.mu.Lock() 957 idle := t.idle 958 if idle.IsZero() { // The connection is non-idle. 959 t.mu.Unlock() 960 maxIdle.Reset(t.kp.MaxConnectionIdle) 961 continue 962 } 963 val := t.kp.MaxConnectionIdle - time.Since(idle) 964 t.mu.Unlock() 965 if val <= 0 { 966 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. 967 // Gracefully close the connection. 968 t.drain(http2.ErrCodeNo, []byte{}) 969 // Resetting the timer so that the clean-up doesn't deadlock. 970 maxIdle.Reset(infinity) 971 return 972 } 973 maxIdle.Reset(val) 974 case <-maxAge.C: 975 t.drain(http2.ErrCodeNo, []byte{}) 976 maxAge.Reset(t.kp.MaxConnectionAgeGrace) 977 select { 978 case <-maxAge.C: 979 // Close the connection after grace period. 980 infof("transport: closing server transport due to maximum connection age.") 981 t.Close() 982 // Resetting the timer so that the clean-up doesn't deadlock. 983 maxAge.Reset(infinity) 984 case <-t.done: 985 } 986 return 987 case <-keepalive.C: 988 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 989 pingSent = false 990 keepalive.Reset(t.kp.Time) 991 continue 992 } 993 if pingSent { 994 infof("transport: closing server transport due to idleness.") 995 t.Close() 996 // Resetting the timer so that the clean-up doesn't deadlock. 997 keepalive.Reset(infinity) 998 return 999 } 1000 pingSent = true 1001 if channelz.IsOn() { 1002 atomic.AddInt64(&t.czData.kpCount, 1) 1003 } 1004 t.controlBuf.put(p) 1005 keepalive.Reset(t.kp.Timeout) 1006 case <-t.done: 1007 return 1008 } 1009 } 1010} 1011 1012// Close starts shutting down the http2Server transport. 1013// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This 1014// could cause some resource issue. Revisit this later. 1015func (t *http2Server) Close() error { 1016 t.mu.Lock() 1017 if t.state == closing { 1018 t.mu.Unlock() 1019 return errors.New("transport: Close() was already called") 1020 } 1021 t.state = closing 1022 streams := t.activeStreams 1023 t.activeStreams = nil 1024 t.mu.Unlock() 1025 t.controlBuf.finish() 1026 close(t.done) 1027 err := t.conn.Close() 1028 if channelz.IsOn() { 1029 channelz.RemoveEntry(t.channelzID) 1030 } 1031 // Cancel all active streams. 1032 for _, s := range streams { 1033 s.cancel() 1034 } 1035 if t.stats != nil { 1036 connEnd := &stats.ConnEnd{} 1037 t.stats.HandleConn(t.ctx, connEnd) 1038 } 1039 return err 1040} 1041 1042// deleteStream deletes the stream s from transport's active streams. 1043func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { 1044 // In case stream sending and receiving are invoked in separate 1045 // goroutines (e.g., bi-directional streaming), cancel needs to be 1046 // called to interrupt the potential blocking on other goroutines. 1047 s.cancel() 1048 1049 t.mu.Lock() 1050 if _, ok := t.activeStreams[s.id]; ok { 1051 delete(t.activeStreams, s.id) 1052 if len(t.activeStreams) == 0 { 1053 t.idle = time.Now() 1054 } 1055 } 1056 t.mu.Unlock() 1057 1058 if channelz.IsOn() { 1059 if eosReceived { 1060 atomic.AddInt64(&t.czData.streamsSucceeded, 1) 1061 } else { 1062 atomic.AddInt64(&t.czData.streamsFailed, 1) 1063 } 1064 } 1065} 1066 1067// finishStream closes the stream and puts the trailing headerFrame into controlbuf. 1068func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { 1069 oldState := s.swapState(streamDone) 1070 if oldState == streamDone { 1071 // If the stream was already done, return. 1072 return 1073 } 1074 1075 hdr.cleanup = &cleanupStream{ 1076 streamID: s.id, 1077 rst: rst, 1078 rstCode: rstCode, 1079 onWrite: func() { 1080 t.deleteStream(s, eosReceived) 1081 }, 1082 } 1083 t.controlBuf.put(hdr) 1084} 1085 1086// closeStream clears the footprint of a stream when the stream is not needed any more. 1087func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { 1088 s.swapState(streamDone) 1089 t.deleteStream(s, eosReceived) 1090 1091 t.controlBuf.put(&cleanupStream{ 1092 streamID: s.id, 1093 rst: rst, 1094 rstCode: rstCode, 1095 onWrite: func() {}, 1096 }) 1097} 1098 1099func (t *http2Server) RemoteAddr() net.Addr { 1100 return t.remoteAddr 1101} 1102 1103func (t *http2Server) Drain() { 1104 t.drain(http2.ErrCodeNo, []byte{}) 1105} 1106 1107func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { 1108 t.mu.Lock() 1109 defer t.mu.Unlock() 1110 if t.drainChan != nil { 1111 return 1112 } 1113 t.drainChan = make(chan struct{}) 1114 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) 1115} 1116 1117var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} 1118 1119// Handles outgoing GoAway and returns true if loopy needs to put itself 1120// in draining mode. 1121func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { 1122 t.mu.Lock() 1123 if t.state == closing { // TODO(mmukhi): This seems unnecessary. 1124 t.mu.Unlock() 1125 // The transport is closing. 1126 return false, ErrConnClosing 1127 } 1128 sid := t.maxStreamID 1129 if !g.headsUp { 1130 // Stop accepting more streams now. 1131 t.state = draining 1132 if len(t.activeStreams) == 0 { 1133 g.closeConn = true 1134 } 1135 t.mu.Unlock() 1136 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { 1137 return false, err 1138 } 1139 if g.closeConn { 1140 // Abruptly close the connection following the GoAway (via 1141 // loopywriter). But flush out what's inside the buffer first. 1142 t.framer.writer.Flush() 1143 return false, fmt.Errorf("transport: Connection closing") 1144 } 1145 return true, nil 1146 } 1147 t.mu.Unlock() 1148 // For a graceful close, send out a GoAway with stream ID of MaxUInt32, 1149 // Follow that with a ping and wait for the ack to come back or a timer 1150 // to expire. During this time accept new streams since they might have 1151 // originated before the GoAway reaches the client. 1152 // After getting the ack or timer expiration send out another GoAway this 1153 // time with an ID of the max stream server intends to process. 1154 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { 1155 return false, err 1156 } 1157 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { 1158 return false, err 1159 } 1160 go func() { 1161 timer := time.NewTimer(time.Minute) 1162 defer timer.Stop() 1163 select { 1164 case <-t.drainChan: 1165 case <-timer.C: 1166 case <-t.done: 1167 return 1168 } 1169 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) 1170 }() 1171 return false, nil 1172} 1173 1174func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { 1175 s := channelz.SocketInternalMetric{ 1176 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), 1177 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), 1178 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), 1179 MessagesSent: atomic.LoadInt64(&t.czData.msgSent), 1180 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), 1181 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), 1182 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), 1183 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), 1184 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), 1185 LocalFlowControlWindow: int64(t.fc.getSize()), 1186 SocketOptions: channelz.GetSocketOption(t.conn), 1187 LocalAddr: t.localAddr, 1188 RemoteAddr: t.remoteAddr, 1189 // RemoteName : 1190 } 1191 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1192 s.Security = au.GetSecurityValue() 1193 } 1194 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1195 return &s 1196} 1197 1198func (t *http2Server) IncrMsgSent() { 1199 atomic.AddInt64(&t.czData.msgSent, 1) 1200 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) 1201} 1202 1203func (t *http2Server) IncrMsgRecv() { 1204 atomic.AddInt64(&t.czData.msgRecv, 1) 1205 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) 1206} 1207 1208func (t *http2Server) getOutFlowWindow() int64 { 1209 resp := make(chan uint32, 1) 1210 timer := time.NewTimer(time.Second) 1211 defer timer.Stop() 1212 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1213 select { 1214 case sz := <-resp: 1215 return int64(sz) 1216 case <-t.done: 1217 return -1 1218 case <-timer.C: 1219 return -2 1220 } 1221} 1222 1223func getJitter(v time.Duration) time.Duration { 1224 if v == infinity { 1225 return 0 1226 } 1227 // Generate a jitter between +/- 10% of the value. 1228 r := int64(v / 10) 1229 j := grpcrand.Int63n(2*r) - r 1230 return time.Duration(j) 1231} 1232