1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "context" 24 "errors" 25 "fmt" 26 "io" 27 "math" 28 "net" 29 "strconv" 30 "sync" 31 "sync/atomic" 32 "time" 33 34 "github.com/golang/protobuf/proto" 35 "golang.org/x/net/http2" 36 "golang.org/x/net/http2/hpack" 37 "google.golang.org/grpc/internal/grpcutil" 38 39 "google.golang.org/grpc/codes" 40 "google.golang.org/grpc/credentials" 41 "google.golang.org/grpc/internal/channelz" 42 "google.golang.org/grpc/internal/grpcrand" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/metadata" 45 "google.golang.org/grpc/peer" 46 "google.golang.org/grpc/stats" 47 "google.golang.org/grpc/status" 48 "google.golang.org/grpc/tap" 49) 50 51var ( 52 // ErrIllegalHeaderWrite indicates that setting header is illegal because of 53 // the stream's state. 54 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") 55 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger 56 // than the limit set by peer. 57 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") 58) 59 60// serverConnectionCounter counts the number of connections a server has seen 61// (equal to the number of http2Servers created). Must be accessed atomically. 62var serverConnectionCounter uint64 63 64// http2Server implements the ServerTransport interface with HTTP2. 65type http2Server struct { 66 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. 67 ctx context.Context 68 done chan struct{} 69 conn net.Conn 70 loopy *loopyWriter 71 readerDone chan struct{} // sync point to enable testing. 72 writerDone chan struct{} // sync point to enable testing. 73 remoteAddr net.Addr 74 localAddr net.Addr 75 maxStreamID uint32 // max stream ID ever seen 76 authInfo credentials.AuthInfo // auth info about the connection 77 inTapHandle tap.ServerInHandle 78 framer *framer 79 // The max number of concurrent streams. 80 maxStreams uint32 81 // controlBuf delivers all the control related tasks (e.g., window 82 // updates, reset streams, and various settings) to the controller. 83 controlBuf *controlBuffer 84 fc *trInFlow 85 stats stats.Handler 86 // Keepalive and max-age parameters for the server. 87 kp keepalive.ServerParameters 88 // Keepalive enforcement policy. 89 kep keepalive.EnforcementPolicy 90 // The time instance last ping was received. 91 lastPingAt time.Time 92 // Number of times the client has violated keepalive ping policy so far. 93 pingStrikes uint8 94 // Flag to signify that number of ping strikes should be reset to 0. 95 // This is set whenever data or header frames are sent. 96 // 1 means yes. 97 resetPingStrikes uint32 // Accessed atomically. 98 initialWindowSize int32 99 bdpEst *bdpEstimator 100 maxSendHeaderListSize *uint32 101 102 mu sync.Mutex // guard the following 103 104 // drainChan is initialized when drain(...) is called the first time. 105 // After which the server writes out the first GoAway(with ID 2^31-1) frame. 106 // Then an independent goroutine will be launched to later send the second GoAway. 107 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. 108 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is 109 // already underway. 110 drainChan chan struct{} 111 state transportState 112 activeStreams map[uint32]*Stream 113 // idle is the time instant when the connection went idle. 114 // This is either the beginning of the connection or when the number of 115 // RPCs go down to 0. 116 // When the connection is busy, this value is set to 0. 117 idle time.Time 118 119 // Fields below are for channelz metric collection. 120 channelzID int64 // channelz unique identification number 121 czData *channelzData 122 bufferPool *bufferPool 123 124 connectionID uint64 125} 126 127// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is 128// returned if something goes wrong. 129func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { 130 writeBufSize := config.WriteBufferSize 131 readBufSize := config.ReadBufferSize 132 maxHeaderListSize := defaultServerMaxHeaderListSize 133 if config.MaxHeaderListSize != nil { 134 maxHeaderListSize = *config.MaxHeaderListSize 135 } 136 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) 137 // Send initial settings as connection preface to client. 138 isettings := []http2.Setting{{ 139 ID: http2.SettingMaxFrameSize, 140 Val: http2MaxFrameLen, 141 }} 142 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is 143 // permitted in the HTTP2 spec. 144 maxStreams := config.MaxStreams 145 if maxStreams == 0 { 146 maxStreams = math.MaxUint32 147 } else { 148 isettings = append(isettings, http2.Setting{ 149 ID: http2.SettingMaxConcurrentStreams, 150 Val: maxStreams, 151 }) 152 } 153 dynamicWindow := true 154 iwz := int32(initialWindowSize) 155 if config.InitialWindowSize >= defaultWindowSize { 156 iwz = config.InitialWindowSize 157 dynamicWindow = false 158 } 159 icwz := int32(initialWindowSize) 160 if config.InitialConnWindowSize >= defaultWindowSize { 161 icwz = config.InitialConnWindowSize 162 dynamicWindow = false 163 } 164 if iwz != defaultWindowSize { 165 isettings = append(isettings, http2.Setting{ 166 ID: http2.SettingInitialWindowSize, 167 Val: uint32(iwz)}) 168 } 169 if config.MaxHeaderListSize != nil { 170 isettings = append(isettings, http2.Setting{ 171 ID: http2.SettingMaxHeaderListSize, 172 Val: *config.MaxHeaderListSize, 173 }) 174 } 175 if config.HeaderTableSize != nil { 176 isettings = append(isettings, http2.Setting{ 177 ID: http2.SettingHeaderTableSize, 178 Val: *config.HeaderTableSize, 179 }) 180 } 181 if err := framer.fr.WriteSettings(isettings...); err != nil { 182 return nil, connectionErrorf(false, err, "transport: %v", err) 183 } 184 // Adjust the connection flow control window if needed. 185 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 186 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { 187 return nil, connectionErrorf(false, err, "transport: %v", err) 188 } 189 } 190 kp := config.KeepaliveParams 191 if kp.MaxConnectionIdle == 0 { 192 kp.MaxConnectionIdle = defaultMaxConnectionIdle 193 } 194 if kp.MaxConnectionAge == 0 { 195 kp.MaxConnectionAge = defaultMaxConnectionAge 196 } 197 // Add a jitter to MaxConnectionAge. 198 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) 199 if kp.MaxConnectionAgeGrace == 0 { 200 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace 201 } 202 if kp.Time == 0 { 203 kp.Time = defaultServerKeepaliveTime 204 } 205 if kp.Timeout == 0 { 206 kp.Timeout = defaultServerKeepaliveTimeout 207 } 208 kep := config.KeepalivePolicy 209 if kep.MinTime == 0 { 210 kep.MinTime = defaultKeepalivePolicyMinTime 211 } 212 done := make(chan struct{}) 213 t := &http2Server{ 214 ctx: context.Background(), 215 done: done, 216 conn: conn, 217 remoteAddr: conn.RemoteAddr(), 218 localAddr: conn.LocalAddr(), 219 authInfo: config.AuthInfo, 220 framer: framer, 221 readerDone: make(chan struct{}), 222 writerDone: make(chan struct{}), 223 maxStreams: maxStreams, 224 inTapHandle: config.InTapHandle, 225 fc: &trInFlow{limit: uint32(icwz)}, 226 state: reachable, 227 activeStreams: make(map[uint32]*Stream), 228 stats: config.StatsHandler, 229 kp: kp, 230 idle: time.Now(), 231 kep: kep, 232 initialWindowSize: iwz, 233 czData: new(channelzData), 234 bufferPool: newBufferPool(), 235 } 236 t.controlBuf = newControlBuffer(t.done) 237 if dynamicWindow { 238 t.bdpEst = &bdpEstimator{ 239 bdp: initialWindowSize, 240 updateFlowControl: t.updateFlowControl, 241 } 242 } 243 if t.stats != nil { 244 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ 245 RemoteAddr: t.remoteAddr, 246 LocalAddr: t.localAddr, 247 }) 248 connBegin := &stats.ConnBegin{} 249 t.stats.HandleConn(t.ctx, connBegin) 250 } 251 if channelz.IsOn() { 252 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) 253 } 254 255 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) 256 257 t.framer.writer.Flush() 258 259 defer func() { 260 if err != nil { 261 t.Close() 262 } 263 }() 264 265 // Check the validity of client preface. 266 preface := make([]byte, len(clientPreface)) 267 if _, err := io.ReadFull(t.conn, preface); err != nil { 268 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) 269 } 270 if !bytes.Equal(preface, clientPreface) { 271 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) 272 } 273 274 frame, err := t.framer.fr.ReadFrame() 275 if err == io.EOF || err == io.ErrUnexpectedEOF { 276 return nil, err 277 } 278 if err != nil { 279 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) 280 } 281 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) 282 sf, ok := frame.(*http2.SettingsFrame) 283 if !ok { 284 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) 285 } 286 t.handleSettings(sf) 287 288 go func() { 289 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) 290 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler 291 if err := t.loopy.run(); err != nil { 292 if logger.V(logLevel) { 293 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err) 294 } 295 } 296 t.conn.Close() 297 close(t.writerDone) 298 }() 299 go t.keepalive() 300 return t, nil 301} 302 303// operateHeader takes action on the decoded headers. 304func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { 305 streamID := frame.Header().StreamID 306 state := &decodeState{ 307 serverSide: true, 308 } 309 if h2code, err := state.decodeHeader(frame); err != nil { 310 if _, ok := status.FromError(err); ok { 311 t.controlBuf.put(&cleanupStream{ 312 streamID: streamID, 313 rst: true, 314 rstCode: h2code, 315 onWrite: func() {}, 316 }) 317 } 318 return false 319 } 320 321 buf := newRecvBuffer() 322 s := &Stream{ 323 id: streamID, 324 st: t, 325 buf: buf, 326 fc: &inFlow{limit: uint32(t.initialWindowSize)}, 327 recvCompress: state.data.encoding, 328 method: state.data.method, 329 contentSubtype: state.data.contentSubtype, 330 } 331 if frame.StreamEnded() { 332 // s is just created by the caller. No lock needed. 333 s.state = streamReadDone 334 } 335 if state.data.timeoutSet { 336 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) 337 } else { 338 s.ctx, s.cancel = context.WithCancel(t.ctx) 339 } 340 pr := &peer.Peer{ 341 Addr: t.remoteAddr, 342 } 343 // Attach Auth info if there is any. 344 if t.authInfo != nil { 345 pr.AuthInfo = t.authInfo 346 } 347 s.ctx = peer.NewContext(s.ctx, pr) 348 // Attach the received metadata to the context. 349 if len(state.data.mdata) > 0 { 350 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata) 351 } 352 if state.data.statsTags != nil { 353 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags) 354 } 355 if state.data.statsTrace != nil { 356 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) 357 } 358 if t.inTapHandle != nil { 359 var err error 360 info := &tap.Info{ 361 FullMethodName: state.data.method, 362 } 363 s.ctx, err = t.inTapHandle(s.ctx, info) 364 if err != nil { 365 if logger.V(logLevel) { 366 logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) 367 } 368 t.controlBuf.put(&cleanupStream{ 369 streamID: s.id, 370 rst: true, 371 rstCode: http2.ErrCodeRefusedStream, 372 onWrite: func() {}, 373 }) 374 s.cancel() 375 return false 376 } 377 } 378 t.mu.Lock() 379 if t.state != reachable { 380 t.mu.Unlock() 381 s.cancel() 382 return false 383 } 384 if uint32(len(t.activeStreams)) >= t.maxStreams { 385 t.mu.Unlock() 386 t.controlBuf.put(&cleanupStream{ 387 streamID: streamID, 388 rst: true, 389 rstCode: http2.ErrCodeRefusedStream, 390 onWrite: func() {}, 391 }) 392 s.cancel() 393 return false 394 } 395 if streamID%2 != 1 || streamID <= t.maxStreamID { 396 t.mu.Unlock() 397 // illegal gRPC stream id. 398 if logger.V(logLevel) { 399 logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) 400 } 401 s.cancel() 402 return true 403 } 404 t.maxStreamID = streamID 405 t.activeStreams[streamID] = s 406 if len(t.activeStreams) == 1 { 407 t.idle = time.Time{} 408 } 409 t.mu.Unlock() 410 if channelz.IsOn() { 411 atomic.AddInt64(&t.czData.streamsStarted, 1) 412 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) 413 } 414 s.requestRead = func(n int) { 415 t.adjustWindow(s, uint32(n)) 416 } 417 s.ctx = traceCtx(s.ctx, s.method) 418 if t.stats != nil { 419 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) 420 inHeader := &stats.InHeader{ 421 FullMethod: s.method, 422 RemoteAddr: t.remoteAddr, 423 LocalAddr: t.localAddr, 424 Compression: s.recvCompress, 425 WireLength: int(frame.Header().Length), 426 Header: metadata.MD(state.data.mdata).Copy(), 427 } 428 t.stats.HandleRPC(s.ctx, inHeader) 429 } 430 s.ctxDone = s.ctx.Done() 431 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) 432 s.trReader = &transportReader{ 433 reader: &recvBufferReader{ 434 ctx: s.ctx, 435 ctxDone: s.ctxDone, 436 recv: s.buf, 437 freeBuffer: t.bufferPool.put, 438 }, 439 windowHandler: func(n int) { 440 t.updateWindow(s, uint32(n)) 441 }, 442 } 443 // Register the stream with loopy. 444 t.controlBuf.put(®isterStream{ 445 streamID: s.id, 446 wq: s.wq, 447 }) 448 handle(s) 449 return false 450} 451 452// HandleStreams receives incoming streams using the given handler. This is 453// typically run in a separate goroutine. 454// traceCtx attaches trace to ctx and returns the new context. 455func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { 456 defer close(t.readerDone) 457 for { 458 t.controlBuf.throttle() 459 frame, err := t.framer.fr.ReadFrame() 460 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) 461 if err != nil { 462 if se, ok := err.(http2.StreamError); ok { 463 if logger.V(logLevel) { 464 logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) 465 } 466 t.mu.Lock() 467 s := t.activeStreams[se.StreamID] 468 t.mu.Unlock() 469 if s != nil { 470 t.closeStream(s, true, se.Code, false) 471 } else { 472 t.controlBuf.put(&cleanupStream{ 473 streamID: se.StreamID, 474 rst: true, 475 rstCode: se.Code, 476 onWrite: func() {}, 477 }) 478 } 479 continue 480 } 481 if err == io.EOF || err == io.ErrUnexpectedEOF { 482 t.Close() 483 return 484 } 485 if logger.V(logLevel) { 486 logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) 487 } 488 t.Close() 489 return 490 } 491 switch frame := frame.(type) { 492 case *http2.MetaHeadersFrame: 493 if t.operateHeaders(frame, handle, traceCtx) { 494 t.Close() 495 break 496 } 497 case *http2.DataFrame: 498 t.handleData(frame) 499 case *http2.RSTStreamFrame: 500 t.handleRSTStream(frame) 501 case *http2.SettingsFrame: 502 t.handleSettings(frame) 503 case *http2.PingFrame: 504 t.handlePing(frame) 505 case *http2.WindowUpdateFrame: 506 t.handleWindowUpdate(frame) 507 case *http2.GoAwayFrame: 508 // TODO: Handle GoAway from the client appropriately. 509 default: 510 if logger.V(logLevel) { 511 logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) 512 } 513 } 514 } 515} 516 517func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { 518 t.mu.Lock() 519 defer t.mu.Unlock() 520 if t.activeStreams == nil { 521 // The transport is closing. 522 return nil, false 523 } 524 s, ok := t.activeStreams[f.Header().StreamID] 525 if !ok { 526 // The stream is already done. 527 return nil, false 528 } 529 return s, true 530} 531 532// adjustWindow sends out extra window update over the initial window size 533// of stream if the application is requesting data larger in size than 534// the window. 535func (t *http2Server) adjustWindow(s *Stream, n uint32) { 536 if w := s.fc.maybeAdjust(n); w > 0 { 537 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 538 } 539 540} 541 542// updateWindow adjusts the inbound quota for the stream and the transport. 543// Window updates will deliver to the controller for sending when 544// the cumulative quota exceeds the corresponding threshold. 545func (t *http2Server) updateWindow(s *Stream, n uint32) { 546 if w := s.fc.onRead(n); w > 0 { 547 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, 548 increment: w, 549 }) 550 } 551} 552 553// updateFlowControl updates the incoming flow control windows 554// for the transport and the stream based on the current bdp 555// estimation. 556func (t *http2Server) updateFlowControl(n uint32) { 557 t.mu.Lock() 558 for _, s := range t.activeStreams { 559 s.fc.newLimit(n) 560 } 561 t.initialWindowSize = int32(n) 562 t.mu.Unlock() 563 t.controlBuf.put(&outgoingWindowUpdate{ 564 streamID: 0, 565 increment: t.fc.newLimit(n), 566 }) 567 t.controlBuf.put(&outgoingSettings{ 568 ss: []http2.Setting{ 569 { 570 ID: http2.SettingInitialWindowSize, 571 Val: n, 572 }, 573 }, 574 }) 575 576} 577 578func (t *http2Server) handleData(f *http2.DataFrame) { 579 size := f.Header().Length 580 var sendBDPPing bool 581 if t.bdpEst != nil { 582 sendBDPPing = t.bdpEst.add(size) 583 } 584 // Decouple connection's flow control from application's read. 585 // An update on connection's flow control should not depend on 586 // whether user application has read the data or not. Such a 587 // restriction is already imposed on the stream's flow control, 588 // and therefore the sender will be blocked anyways. 589 // Decoupling the connection flow control will prevent other 590 // active(fast) streams from starving in presence of slow or 591 // inactive streams. 592 if w := t.fc.onData(size); w > 0 { 593 t.controlBuf.put(&outgoingWindowUpdate{ 594 streamID: 0, 595 increment: w, 596 }) 597 } 598 if sendBDPPing { 599 // Avoid excessive ping detection (e.g. in an L7 proxy) 600 // by sending a window update prior to the BDP ping. 601 if w := t.fc.reset(); w > 0 { 602 t.controlBuf.put(&outgoingWindowUpdate{ 603 streamID: 0, 604 increment: w, 605 }) 606 } 607 t.controlBuf.put(bdpPing) 608 } 609 // Select the right stream to dispatch. 610 s, ok := t.getStream(f) 611 if !ok { 612 return 613 } 614 if s.getState() == streamReadDone { 615 t.closeStream(s, true, http2.ErrCodeStreamClosed, false) 616 return 617 } 618 if size > 0 { 619 if err := s.fc.onData(size); err != nil { 620 t.closeStream(s, true, http2.ErrCodeFlowControl, false) 621 return 622 } 623 if f.Header().Flags.Has(http2.FlagDataPadded) { 624 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 625 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 626 } 627 } 628 // TODO(bradfitz, zhaoq): A copy is required here because there is no 629 // guarantee f.Data() is consumed before the arrival of next frame. 630 // Can this copy be eliminated? 631 if len(f.Data()) > 0 { 632 buffer := t.bufferPool.get() 633 buffer.Reset() 634 buffer.Write(f.Data()) 635 s.write(recvMsg{buffer: buffer}) 636 } 637 } 638 if f.Header().Flags.Has(http2.FlagDataEndStream) { 639 // Received the end of stream from the client. 640 s.compareAndSwapState(streamActive, streamReadDone) 641 s.write(recvMsg{err: io.EOF}) 642 } 643} 644 645func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { 646 // If the stream is not deleted from the transport's active streams map, then do a regular close stream. 647 if s, ok := t.getStream(f); ok { 648 t.closeStream(s, false, 0, false) 649 return 650 } 651 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map. 652 t.controlBuf.put(&cleanupStream{ 653 streamID: f.Header().StreamID, 654 rst: false, 655 rstCode: 0, 656 onWrite: func() {}, 657 }) 658} 659 660func (t *http2Server) handleSettings(f *http2.SettingsFrame) { 661 if f.IsAck() { 662 return 663 } 664 var ss []http2.Setting 665 var updateFuncs []func() 666 f.ForeachSetting(func(s http2.Setting) error { 667 switch s.ID { 668 case http2.SettingMaxHeaderListSize: 669 updateFuncs = append(updateFuncs, func() { 670 t.maxSendHeaderListSize = new(uint32) 671 *t.maxSendHeaderListSize = s.Val 672 }) 673 default: 674 ss = append(ss, s) 675 } 676 return nil 677 }) 678 t.controlBuf.executeAndPut(func(interface{}) bool { 679 for _, f := range updateFuncs { 680 f() 681 } 682 return true 683 }, &incomingSettings{ 684 ss: ss, 685 }) 686} 687 688const ( 689 maxPingStrikes = 2 690 defaultPingTimeout = 2 * time.Hour 691) 692 693func (t *http2Server) handlePing(f *http2.PingFrame) { 694 if f.IsAck() { 695 if f.Data == goAwayPing.data && t.drainChan != nil { 696 close(t.drainChan) 697 return 698 } 699 // Maybe it's a BDP ping. 700 if t.bdpEst != nil { 701 t.bdpEst.calculate(f.Data) 702 } 703 return 704 } 705 pingAck := &ping{ack: true} 706 copy(pingAck.data[:], f.Data[:]) 707 t.controlBuf.put(pingAck) 708 709 now := time.Now() 710 defer func() { 711 t.lastPingAt = now 712 }() 713 // A reset ping strikes means that we don't need to check for policy 714 // violation for this ping and the pingStrikes counter should be set 715 // to 0. 716 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { 717 t.pingStrikes = 0 718 return 719 } 720 t.mu.Lock() 721 ns := len(t.activeStreams) 722 t.mu.Unlock() 723 if ns < 1 && !t.kep.PermitWithoutStream { 724 // Keepalive shouldn't be active thus, this new ping should 725 // have come after at least defaultPingTimeout. 726 if t.lastPingAt.Add(defaultPingTimeout).After(now) { 727 t.pingStrikes++ 728 } 729 } else { 730 // Check if keepalive policy is respected. 731 if t.lastPingAt.Add(t.kep.MinTime).After(now) { 732 t.pingStrikes++ 733 } 734 } 735 736 if t.pingStrikes > maxPingStrikes { 737 // Send goaway and close the connection. 738 if logger.V(logLevel) { 739 logger.Errorf("transport: Got too many pings from the client, closing the connection.") 740 } 741 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) 742 } 743} 744 745func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { 746 t.controlBuf.put(&incomingWindowUpdate{ 747 streamID: f.Header().StreamID, 748 increment: f.Increment, 749 }) 750} 751 752func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { 753 for k, vv := range md { 754 if isReservedHeader(k) { 755 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 756 continue 757 } 758 for _, v := range vv { 759 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 760 } 761 } 762 return headerFields 763} 764 765func (t *http2Server) checkForHeaderListSize(it interface{}) bool { 766 if t.maxSendHeaderListSize == nil { 767 return true 768 } 769 hdrFrame := it.(*headerFrame) 770 var sz int64 771 for _, f := range hdrFrame.hf { 772 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { 773 if logger.V(logLevel) { 774 logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) 775 } 776 return false 777 } 778 } 779 return true 780} 781 782// WriteHeader sends the header metadata md back to the client. 783func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { 784 if s.updateHeaderSent() || s.getState() == streamDone { 785 return ErrIllegalHeaderWrite 786 } 787 s.hdrMu.Lock() 788 if md.Len() > 0 { 789 if s.header.Len() > 0 { 790 s.header = metadata.Join(s.header, md) 791 } else { 792 s.header = md 793 } 794 } 795 if err := t.writeHeaderLocked(s); err != nil { 796 s.hdrMu.Unlock() 797 return err 798 } 799 s.hdrMu.Unlock() 800 return nil 801} 802 803func (t *http2Server) setResetPingStrikes() { 804 atomic.StoreUint32(&t.resetPingStrikes, 1) 805} 806 807func (t *http2Server) writeHeaderLocked(s *Stream) error { 808 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 809 // first and create a slice of that exact size. 810 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. 811 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 812 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) 813 if s.sendCompress != "" { 814 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 815 } 816 headerFields = appendHeaderFieldsFromMD(headerFields, s.header) 817 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ 818 streamID: s.id, 819 hf: headerFields, 820 endStream: false, 821 onWrite: t.setResetPingStrikes, 822 }) 823 if !success { 824 if err != nil { 825 return err 826 } 827 t.closeStream(s, true, http2.ErrCodeInternal, false) 828 return ErrHeaderListSizeLimitViolation 829 } 830 if t.stats != nil { 831 // Note: Headers are compressed with hpack after this call returns. 832 // No WireLength field is set here. 833 outHeader := &stats.OutHeader{ 834 Header: s.header.Copy(), 835 Compression: s.sendCompress, 836 } 837 t.stats.HandleRPC(s.Context(), outHeader) 838 } 839 return nil 840} 841 842// WriteStatus sends stream status to the client and terminates the stream. 843// There is no further I/O operations being able to perform on this stream. 844// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early 845// OK is adopted. 846func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { 847 if s.getState() == streamDone { 848 return nil 849 } 850 s.hdrMu.Lock() 851 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 852 // first and create a slice of that exact size. 853 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. 854 if !s.updateHeaderSent() { // No headers have been sent. 855 if len(s.header) > 0 { // Send a separate header frame. 856 if err := t.writeHeaderLocked(s); err != nil { 857 s.hdrMu.Unlock() 858 return err 859 } 860 } else { // Send a trailer only response. 861 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 862 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) 863 } 864 } 865 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) 866 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) 867 868 if p := st.Proto(); p != nil && len(p.Details) > 0 { 869 stBytes, err := proto.Marshal(p) 870 if err != nil { 871 // TODO: return error instead, when callers are able to handle it. 872 logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) 873 } else { 874 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) 875 } 876 } 877 878 // Attach the trailer metadata. 879 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) 880 trailingHeader := &headerFrame{ 881 streamID: s.id, 882 hf: headerFields, 883 endStream: true, 884 onWrite: t.setResetPingStrikes, 885 } 886 s.hdrMu.Unlock() 887 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) 888 if !success { 889 if err != nil { 890 return err 891 } 892 t.closeStream(s, true, http2.ErrCodeInternal, false) 893 return ErrHeaderListSizeLimitViolation 894 } 895 // Send a RST_STREAM after the trailers if the client has not already half-closed. 896 rst := s.getState() == streamActive 897 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) 898 if t.stats != nil { 899 // Note: The trailer fields are compressed with hpack after this call returns. 900 // No WireLength field is set here. 901 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ 902 Trailer: s.trailer.Copy(), 903 }) 904 } 905 return nil 906} 907 908// Write converts the data into HTTP2 data frame and sends it out. Non-nil error 909// is returns if it fails (e.g., framing error, transport error). 910func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 911 if !s.isHeaderSent() { // Headers haven't been written yet. 912 if err := t.WriteHeader(s, nil); err != nil { 913 if _, ok := err.(ConnectionError); ok { 914 return err 915 } 916 // TODO(mmukhi, dfawley): Make sure this is the right code to return. 917 return status.Errorf(codes.Internal, "transport: %v", err) 918 } 919 } else { 920 // Writing headers checks for this condition. 921 if s.getState() == streamDone { 922 // TODO(mmukhi, dfawley): Should the server write also return io.EOF? 923 s.cancel() 924 select { 925 case <-t.done: 926 return ErrConnClosing 927 default: 928 } 929 return ContextErr(s.ctx.Err()) 930 } 931 } 932 df := &dataFrame{ 933 streamID: s.id, 934 h: hdr, 935 d: data, 936 onEachWrite: t.setResetPingStrikes, 937 } 938 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 939 select { 940 case <-t.done: 941 return ErrConnClosing 942 default: 943 } 944 return ContextErr(s.ctx.Err()) 945 } 946 return t.controlBuf.put(df) 947} 948 949// keepalive running in a separate goroutine does the following: 950// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. 951// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. 952// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. 953// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection 954// after an additional duration of keepalive.Timeout. 955func (t *http2Server) keepalive() { 956 p := &ping{} 957 // True iff a ping has been sent, and no data has been received since then. 958 outstandingPing := false 959 // Amount of time remaining before which we should receive an ACK for the 960 // last sent ping. 961 kpTimeoutLeft := time.Duration(0) 962 // Records the last value of t.lastRead before we go block on the timer. 963 // This is required to check for read activity since then. 964 prevNano := time.Now().UnixNano() 965 // Initialize the different timers to their default values. 966 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) 967 ageTimer := time.NewTimer(t.kp.MaxConnectionAge) 968 kpTimer := time.NewTimer(t.kp.Time) 969 defer func() { 970 // We need to drain the underlying channel in these timers after a call 971 // to Stop(), only if we are interested in resetting them. Clearly we 972 // are not interested in resetting them here. 973 idleTimer.Stop() 974 ageTimer.Stop() 975 kpTimer.Stop() 976 }() 977 978 for { 979 select { 980 case <-idleTimer.C: 981 t.mu.Lock() 982 idle := t.idle 983 if idle.IsZero() { // The connection is non-idle. 984 t.mu.Unlock() 985 idleTimer.Reset(t.kp.MaxConnectionIdle) 986 continue 987 } 988 val := t.kp.MaxConnectionIdle - time.Since(idle) 989 t.mu.Unlock() 990 if val <= 0 { 991 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. 992 // Gracefully close the connection. 993 t.drain(http2.ErrCodeNo, []byte{}) 994 return 995 } 996 idleTimer.Reset(val) 997 case <-ageTimer.C: 998 t.drain(http2.ErrCodeNo, []byte{}) 999 ageTimer.Reset(t.kp.MaxConnectionAgeGrace) 1000 select { 1001 case <-ageTimer.C: 1002 // Close the connection after grace period. 1003 if logger.V(logLevel) { 1004 logger.Infof("transport: closing server transport due to maximum connection age.") 1005 } 1006 t.Close() 1007 case <-t.done: 1008 } 1009 return 1010 case <-kpTimer.C: 1011 lastRead := atomic.LoadInt64(&t.lastRead) 1012 if lastRead > prevNano { 1013 // There has been read activity since the last time we were 1014 // here. Setup the timer to fire at kp.Time seconds from 1015 // lastRead time and continue. 1016 outstandingPing = false 1017 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) 1018 prevNano = lastRead 1019 continue 1020 } 1021 if outstandingPing && kpTimeoutLeft <= 0 { 1022 if logger.V(logLevel) { 1023 logger.Infof("transport: closing server transport due to idleness.") 1024 } 1025 t.Close() 1026 return 1027 } 1028 if !outstandingPing { 1029 if channelz.IsOn() { 1030 atomic.AddInt64(&t.czData.kpCount, 1) 1031 } 1032 t.controlBuf.put(p) 1033 kpTimeoutLeft = t.kp.Timeout 1034 outstandingPing = true 1035 } 1036 // The amount of time to sleep here is the minimum of kp.Time and 1037 // timeoutLeft. This will ensure that we wait only for kp.Time 1038 // before sending out the next ping (for cases where the ping is 1039 // acked). 1040 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) 1041 kpTimeoutLeft -= sleepDuration 1042 kpTimer.Reset(sleepDuration) 1043 case <-t.done: 1044 return 1045 } 1046 } 1047} 1048 1049// Close starts shutting down the http2Server transport. 1050// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This 1051// could cause some resource issue. Revisit this later. 1052func (t *http2Server) Close() error { 1053 t.mu.Lock() 1054 if t.state == closing { 1055 t.mu.Unlock() 1056 return errors.New("transport: Close() was already called") 1057 } 1058 t.state = closing 1059 streams := t.activeStreams 1060 t.activeStreams = nil 1061 t.mu.Unlock() 1062 t.controlBuf.finish() 1063 close(t.done) 1064 err := t.conn.Close() 1065 if channelz.IsOn() { 1066 channelz.RemoveEntry(t.channelzID) 1067 } 1068 // Cancel all active streams. 1069 for _, s := range streams { 1070 s.cancel() 1071 } 1072 if t.stats != nil { 1073 connEnd := &stats.ConnEnd{} 1074 t.stats.HandleConn(t.ctx, connEnd) 1075 } 1076 return err 1077} 1078 1079// deleteStream deletes the stream s from transport's active streams. 1080func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { 1081 // In case stream sending and receiving are invoked in separate 1082 // goroutines (e.g., bi-directional streaming), cancel needs to be 1083 // called to interrupt the potential blocking on other goroutines. 1084 s.cancel() 1085 1086 t.mu.Lock() 1087 if _, ok := t.activeStreams[s.id]; ok { 1088 delete(t.activeStreams, s.id) 1089 if len(t.activeStreams) == 0 { 1090 t.idle = time.Now() 1091 } 1092 } 1093 t.mu.Unlock() 1094 1095 if channelz.IsOn() { 1096 if eosReceived { 1097 atomic.AddInt64(&t.czData.streamsSucceeded, 1) 1098 } else { 1099 atomic.AddInt64(&t.czData.streamsFailed, 1) 1100 } 1101 } 1102} 1103 1104// finishStream closes the stream and puts the trailing headerFrame into controlbuf. 1105func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { 1106 oldState := s.swapState(streamDone) 1107 if oldState == streamDone { 1108 // If the stream was already done, return. 1109 return 1110 } 1111 1112 hdr.cleanup = &cleanupStream{ 1113 streamID: s.id, 1114 rst: rst, 1115 rstCode: rstCode, 1116 onWrite: func() { 1117 t.deleteStream(s, eosReceived) 1118 }, 1119 } 1120 t.controlBuf.put(hdr) 1121} 1122 1123// closeStream clears the footprint of a stream when the stream is not needed any more. 1124func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { 1125 s.swapState(streamDone) 1126 t.deleteStream(s, eosReceived) 1127 1128 t.controlBuf.put(&cleanupStream{ 1129 streamID: s.id, 1130 rst: rst, 1131 rstCode: rstCode, 1132 onWrite: func() {}, 1133 }) 1134} 1135 1136func (t *http2Server) RemoteAddr() net.Addr { 1137 return t.remoteAddr 1138} 1139 1140func (t *http2Server) Drain() { 1141 t.drain(http2.ErrCodeNo, []byte{}) 1142} 1143 1144func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { 1145 t.mu.Lock() 1146 defer t.mu.Unlock() 1147 if t.drainChan != nil { 1148 return 1149 } 1150 t.drainChan = make(chan struct{}) 1151 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) 1152} 1153 1154var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} 1155 1156// Handles outgoing GoAway and returns true if loopy needs to put itself 1157// in draining mode. 1158func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { 1159 t.mu.Lock() 1160 if t.state == closing { // TODO(mmukhi): This seems unnecessary. 1161 t.mu.Unlock() 1162 // The transport is closing. 1163 return false, ErrConnClosing 1164 } 1165 sid := t.maxStreamID 1166 if !g.headsUp { 1167 // Stop accepting more streams now. 1168 t.state = draining 1169 if len(t.activeStreams) == 0 { 1170 g.closeConn = true 1171 } 1172 t.mu.Unlock() 1173 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { 1174 return false, err 1175 } 1176 if g.closeConn { 1177 // Abruptly close the connection following the GoAway (via 1178 // loopywriter). But flush out what's inside the buffer first. 1179 t.framer.writer.Flush() 1180 return false, fmt.Errorf("transport: Connection closing") 1181 } 1182 return true, nil 1183 } 1184 t.mu.Unlock() 1185 // For a graceful close, send out a GoAway with stream ID of MaxUInt32, 1186 // Follow that with a ping and wait for the ack to come back or a timer 1187 // to expire. During this time accept new streams since they might have 1188 // originated before the GoAway reaches the client. 1189 // After getting the ack or timer expiration send out another GoAway this 1190 // time with an ID of the max stream server intends to process. 1191 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { 1192 return false, err 1193 } 1194 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { 1195 return false, err 1196 } 1197 go func() { 1198 timer := time.NewTimer(time.Minute) 1199 defer timer.Stop() 1200 select { 1201 case <-t.drainChan: 1202 case <-timer.C: 1203 case <-t.done: 1204 return 1205 } 1206 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) 1207 }() 1208 return false, nil 1209} 1210 1211func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { 1212 s := channelz.SocketInternalMetric{ 1213 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), 1214 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), 1215 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), 1216 MessagesSent: atomic.LoadInt64(&t.czData.msgSent), 1217 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), 1218 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), 1219 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), 1220 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), 1221 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), 1222 LocalFlowControlWindow: int64(t.fc.getSize()), 1223 SocketOptions: channelz.GetSocketOption(t.conn), 1224 LocalAddr: t.localAddr, 1225 RemoteAddr: t.remoteAddr, 1226 // RemoteName : 1227 } 1228 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1229 s.Security = au.GetSecurityValue() 1230 } 1231 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1232 return &s 1233} 1234 1235func (t *http2Server) IncrMsgSent() { 1236 atomic.AddInt64(&t.czData.msgSent, 1) 1237 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) 1238} 1239 1240func (t *http2Server) IncrMsgRecv() { 1241 atomic.AddInt64(&t.czData.msgRecv, 1) 1242 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) 1243} 1244 1245func (t *http2Server) getOutFlowWindow() int64 { 1246 resp := make(chan uint32, 1) 1247 timer := time.NewTimer(time.Second) 1248 defer timer.Stop() 1249 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1250 select { 1251 case sz := <-resp: 1252 return int64(sz) 1253 case <-t.done: 1254 return -1 1255 case <-timer.C: 1256 return -2 1257 } 1258} 1259 1260func getJitter(v time.Duration) time.Duration { 1261 if v == infinity { 1262 return 0 1263 } 1264 // Generate a jitter between +/- 10% of the value. 1265 r := int64(v / 10) 1266 j := grpcrand.Int63n(2*r) - r 1267 return time.Duration(j) 1268} 1269