1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "context" 24 "errors" 25 "fmt" 26 "io" 27 "math" 28 "net" 29 "net/http" 30 "strconv" 31 "sync" 32 "sync/atomic" 33 "time" 34 35 "github.com/golang/protobuf/proto" 36 "golang.org/x/net/http2" 37 "golang.org/x/net/http2/hpack" 38 "google.golang.org/grpc/internal/grpcutil" 39 40 "google.golang.org/grpc/codes" 41 "google.golang.org/grpc/credentials" 42 "google.golang.org/grpc/internal/channelz" 43 "google.golang.org/grpc/internal/grpcrand" 44 "google.golang.org/grpc/keepalive" 45 "google.golang.org/grpc/metadata" 46 "google.golang.org/grpc/peer" 47 "google.golang.org/grpc/stats" 48 "google.golang.org/grpc/status" 49 "google.golang.org/grpc/tap" 50) 51 52var ( 53 // ErrIllegalHeaderWrite indicates that setting header is illegal because of 54 // the stream's state. 55 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") 56 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger 57 // than the limit set by peer. 58 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") 59) 60 61// serverConnectionCounter counts the number of connections a server has seen 62// (equal to the number of http2Servers created). Must be accessed atomically. 63var serverConnectionCounter uint64 64 65// http2Server implements the ServerTransport interface with HTTP2. 66type http2Server struct { 67 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. 68 ctx context.Context 69 done chan struct{} 70 conn net.Conn 71 loopy *loopyWriter 72 readerDone chan struct{} // sync point to enable testing. 73 writerDone chan struct{} // sync point to enable testing. 74 remoteAddr net.Addr 75 localAddr net.Addr 76 maxStreamID uint32 // max stream ID ever seen 77 authInfo credentials.AuthInfo // auth info about the connection 78 inTapHandle tap.ServerInHandle 79 framer *framer 80 // The max number of concurrent streams. 81 maxStreams uint32 82 // controlBuf delivers all the control related tasks (e.g., window 83 // updates, reset streams, and various settings) to the controller. 84 controlBuf *controlBuffer 85 fc *trInFlow 86 stats stats.Handler 87 // Keepalive and max-age parameters for the server. 88 kp keepalive.ServerParameters 89 // Keepalive enforcement policy. 90 kep keepalive.EnforcementPolicy 91 // The time instance last ping was received. 92 lastPingAt time.Time 93 // Number of times the client has violated keepalive ping policy so far. 94 pingStrikes uint8 95 // Flag to signify that number of ping strikes should be reset to 0. 96 // This is set whenever data or header frames are sent. 97 // 1 means yes. 98 resetPingStrikes uint32 // Accessed atomically. 99 initialWindowSize int32 100 bdpEst *bdpEstimator 101 maxSendHeaderListSize *uint32 102 103 mu sync.Mutex // guard the following 104 105 // drainChan is initialized when drain(...) is called the first time. 106 // After which the server writes out the first GoAway(with ID 2^31-1) frame. 107 // Then an independent goroutine will be launched to later send the second GoAway. 108 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. 109 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is 110 // already underway. 111 drainChan chan struct{} 112 state transportState 113 activeStreams map[uint32]*Stream 114 // idle is the time instant when the connection went idle. 115 // This is either the beginning of the connection or when the number of 116 // RPCs go down to 0. 117 // When the connection is busy, this value is set to 0. 118 idle time.Time 119 120 // Fields below are for channelz metric collection. 121 channelzID int64 // channelz unique identification number 122 czData *channelzData 123 bufferPool *bufferPool 124 125 connectionID uint64 126} 127 128// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is 129// returned if something goes wrong. 130func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { 131 writeBufSize := config.WriteBufferSize 132 readBufSize := config.ReadBufferSize 133 maxHeaderListSize := defaultServerMaxHeaderListSize 134 if config.MaxHeaderListSize != nil { 135 maxHeaderListSize = *config.MaxHeaderListSize 136 } 137 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) 138 // Send initial settings as connection preface to client. 139 isettings := []http2.Setting{{ 140 ID: http2.SettingMaxFrameSize, 141 Val: http2MaxFrameLen, 142 }} 143 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is 144 // permitted in the HTTP2 spec. 145 maxStreams := config.MaxStreams 146 if maxStreams == 0 { 147 maxStreams = math.MaxUint32 148 } else { 149 isettings = append(isettings, http2.Setting{ 150 ID: http2.SettingMaxConcurrentStreams, 151 Val: maxStreams, 152 }) 153 } 154 dynamicWindow := true 155 iwz := int32(initialWindowSize) 156 if config.InitialWindowSize >= defaultWindowSize { 157 iwz = config.InitialWindowSize 158 dynamicWindow = false 159 } 160 icwz := int32(initialWindowSize) 161 if config.InitialConnWindowSize >= defaultWindowSize { 162 icwz = config.InitialConnWindowSize 163 dynamicWindow = false 164 } 165 if iwz != defaultWindowSize { 166 isettings = append(isettings, http2.Setting{ 167 ID: http2.SettingInitialWindowSize, 168 Val: uint32(iwz)}) 169 } 170 if config.MaxHeaderListSize != nil { 171 isettings = append(isettings, http2.Setting{ 172 ID: http2.SettingMaxHeaderListSize, 173 Val: *config.MaxHeaderListSize, 174 }) 175 } 176 if config.HeaderTableSize != nil { 177 isettings = append(isettings, http2.Setting{ 178 ID: http2.SettingHeaderTableSize, 179 Val: *config.HeaderTableSize, 180 }) 181 } 182 if err := framer.fr.WriteSettings(isettings...); err != nil { 183 return nil, connectionErrorf(false, err, "transport: %v", err) 184 } 185 // Adjust the connection flow control window if needed. 186 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 187 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { 188 return nil, connectionErrorf(false, err, "transport: %v", err) 189 } 190 } 191 kp := config.KeepaliveParams 192 if kp.MaxConnectionIdle == 0 { 193 kp.MaxConnectionIdle = defaultMaxConnectionIdle 194 } 195 if kp.MaxConnectionAge == 0 { 196 kp.MaxConnectionAge = defaultMaxConnectionAge 197 } 198 // Add a jitter to MaxConnectionAge. 199 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) 200 if kp.MaxConnectionAgeGrace == 0 { 201 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace 202 } 203 if kp.Time == 0 { 204 kp.Time = defaultServerKeepaliveTime 205 } 206 if kp.Timeout == 0 { 207 kp.Timeout = defaultServerKeepaliveTimeout 208 } 209 kep := config.KeepalivePolicy 210 if kep.MinTime == 0 { 211 kep.MinTime = defaultKeepalivePolicyMinTime 212 } 213 done := make(chan struct{}) 214 t := &http2Server{ 215 ctx: context.Background(), 216 done: done, 217 conn: conn, 218 remoteAddr: conn.RemoteAddr(), 219 localAddr: conn.LocalAddr(), 220 authInfo: config.AuthInfo, 221 framer: framer, 222 readerDone: make(chan struct{}), 223 writerDone: make(chan struct{}), 224 maxStreams: maxStreams, 225 inTapHandle: config.InTapHandle, 226 fc: &trInFlow{limit: uint32(icwz)}, 227 state: reachable, 228 activeStreams: make(map[uint32]*Stream), 229 stats: config.StatsHandler, 230 kp: kp, 231 idle: time.Now(), 232 kep: kep, 233 initialWindowSize: iwz, 234 czData: new(channelzData), 235 bufferPool: newBufferPool(), 236 } 237 t.controlBuf = newControlBuffer(t.done) 238 if dynamicWindow { 239 t.bdpEst = &bdpEstimator{ 240 bdp: initialWindowSize, 241 updateFlowControl: t.updateFlowControl, 242 } 243 } 244 if t.stats != nil { 245 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ 246 RemoteAddr: t.remoteAddr, 247 LocalAddr: t.localAddr, 248 }) 249 connBegin := &stats.ConnBegin{} 250 t.stats.HandleConn(t.ctx, connBegin) 251 } 252 if channelz.IsOn() { 253 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) 254 } 255 256 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) 257 258 t.framer.writer.Flush() 259 260 defer func() { 261 if err != nil { 262 t.Close() 263 } 264 }() 265 266 // Check the validity of client preface. 267 preface := make([]byte, len(clientPreface)) 268 if _, err := io.ReadFull(t.conn, preface); err != nil { 269 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) 270 } 271 if !bytes.Equal(preface, clientPreface) { 272 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) 273 } 274 275 frame, err := t.framer.fr.ReadFrame() 276 if err == io.EOF || err == io.ErrUnexpectedEOF { 277 return nil, err 278 } 279 if err != nil { 280 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) 281 } 282 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) 283 sf, ok := frame.(*http2.SettingsFrame) 284 if !ok { 285 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) 286 } 287 t.handleSettings(sf) 288 289 go func() { 290 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) 291 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler 292 if err := t.loopy.run(); err != nil { 293 if logger.V(logLevel) { 294 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err) 295 } 296 } 297 t.conn.Close() 298 close(t.writerDone) 299 }() 300 go t.keepalive() 301 return t, nil 302} 303 304// operateHeader takes action on the decoded headers. 305func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { 306 streamID := frame.Header().StreamID 307 state := &decodeState{ 308 serverSide: true, 309 } 310 if h2code, err := state.decodeHeader(frame); err != nil { 311 if _, ok := status.FromError(err); ok { 312 t.controlBuf.put(&cleanupStream{ 313 streamID: streamID, 314 rst: true, 315 rstCode: h2code, 316 onWrite: func() {}, 317 }) 318 } 319 return false 320 } 321 322 buf := newRecvBuffer() 323 s := &Stream{ 324 id: streamID, 325 st: t, 326 buf: buf, 327 fc: &inFlow{limit: uint32(t.initialWindowSize)}, 328 recvCompress: state.data.encoding, 329 method: state.data.method, 330 contentSubtype: state.data.contentSubtype, 331 } 332 if frame.StreamEnded() { 333 // s is just created by the caller. No lock needed. 334 s.state = streamReadDone 335 } 336 if state.data.timeoutSet { 337 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) 338 } else { 339 s.ctx, s.cancel = context.WithCancel(t.ctx) 340 } 341 pr := &peer.Peer{ 342 Addr: t.remoteAddr, 343 } 344 // Attach Auth info if there is any. 345 if t.authInfo != nil { 346 pr.AuthInfo = t.authInfo 347 } 348 s.ctx = peer.NewContext(s.ctx, pr) 349 // Attach the received metadata to the context. 350 if len(state.data.mdata) > 0 { 351 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata) 352 } 353 if state.data.statsTags != nil { 354 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags) 355 } 356 if state.data.statsTrace != nil { 357 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) 358 } 359 t.mu.Lock() 360 if t.state != reachable { 361 t.mu.Unlock() 362 s.cancel() 363 return false 364 } 365 if uint32(len(t.activeStreams)) >= t.maxStreams { 366 t.mu.Unlock() 367 t.controlBuf.put(&cleanupStream{ 368 streamID: streamID, 369 rst: true, 370 rstCode: http2.ErrCodeRefusedStream, 371 onWrite: func() {}, 372 }) 373 s.cancel() 374 return false 375 } 376 if streamID%2 != 1 || streamID <= t.maxStreamID { 377 t.mu.Unlock() 378 // illegal gRPC stream id. 379 if logger.V(logLevel) { 380 logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) 381 } 382 s.cancel() 383 return true 384 } 385 t.maxStreamID = streamID 386 if state.data.httpMethod != http.MethodPost { 387 t.mu.Unlock() 388 if logger.V(logLevel) { 389 logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod) 390 } 391 t.controlBuf.put(&cleanupStream{ 392 streamID: streamID, 393 rst: true, 394 rstCode: http2.ErrCodeProtocol, 395 onWrite: func() {}, 396 }) 397 s.cancel() 398 return false 399 } 400 if t.inTapHandle != nil { 401 var err error 402 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil { 403 t.mu.Unlock() 404 if logger.V(logLevel) { 405 logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) 406 } 407 stat, ok := status.FromError(err) 408 if !ok { 409 stat = status.New(codes.PermissionDenied, err.Error()) 410 } 411 t.controlBuf.put(&earlyAbortStream{ 412 streamID: s.id, 413 contentSubtype: s.contentSubtype, 414 status: stat, 415 }) 416 return false 417 } 418 } 419 t.activeStreams[streamID] = s 420 if len(t.activeStreams) == 1 { 421 t.idle = time.Time{} 422 } 423 t.mu.Unlock() 424 if channelz.IsOn() { 425 atomic.AddInt64(&t.czData.streamsStarted, 1) 426 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) 427 } 428 s.requestRead = func(n int) { 429 t.adjustWindow(s, uint32(n)) 430 } 431 s.ctx = traceCtx(s.ctx, s.method) 432 if t.stats != nil { 433 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) 434 inHeader := &stats.InHeader{ 435 FullMethod: s.method, 436 RemoteAddr: t.remoteAddr, 437 LocalAddr: t.localAddr, 438 Compression: s.recvCompress, 439 WireLength: int(frame.Header().Length), 440 Header: metadata.MD(state.data.mdata).Copy(), 441 } 442 t.stats.HandleRPC(s.ctx, inHeader) 443 } 444 s.ctxDone = s.ctx.Done() 445 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) 446 s.trReader = &transportReader{ 447 reader: &recvBufferReader{ 448 ctx: s.ctx, 449 ctxDone: s.ctxDone, 450 recv: s.buf, 451 freeBuffer: t.bufferPool.put, 452 }, 453 windowHandler: func(n int) { 454 t.updateWindow(s, uint32(n)) 455 }, 456 } 457 // Register the stream with loopy. 458 t.controlBuf.put(®isterStream{ 459 streamID: s.id, 460 wq: s.wq, 461 }) 462 handle(s) 463 return false 464} 465 466// HandleStreams receives incoming streams using the given handler. This is 467// typically run in a separate goroutine. 468// traceCtx attaches trace to ctx and returns the new context. 469func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { 470 defer close(t.readerDone) 471 for { 472 t.controlBuf.throttle() 473 frame, err := t.framer.fr.ReadFrame() 474 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) 475 if err != nil { 476 if se, ok := err.(http2.StreamError); ok { 477 if logger.V(logLevel) { 478 logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) 479 } 480 t.mu.Lock() 481 s := t.activeStreams[se.StreamID] 482 t.mu.Unlock() 483 if s != nil { 484 t.closeStream(s, true, se.Code, false) 485 } else { 486 t.controlBuf.put(&cleanupStream{ 487 streamID: se.StreamID, 488 rst: true, 489 rstCode: se.Code, 490 onWrite: func() {}, 491 }) 492 } 493 continue 494 } 495 if err == io.EOF || err == io.ErrUnexpectedEOF { 496 t.Close() 497 return 498 } 499 if logger.V(logLevel) { 500 logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) 501 } 502 t.Close() 503 return 504 } 505 switch frame := frame.(type) { 506 case *http2.MetaHeadersFrame: 507 if t.operateHeaders(frame, handle, traceCtx) { 508 t.Close() 509 break 510 } 511 case *http2.DataFrame: 512 t.handleData(frame) 513 case *http2.RSTStreamFrame: 514 t.handleRSTStream(frame) 515 case *http2.SettingsFrame: 516 t.handleSettings(frame) 517 case *http2.PingFrame: 518 t.handlePing(frame) 519 case *http2.WindowUpdateFrame: 520 t.handleWindowUpdate(frame) 521 case *http2.GoAwayFrame: 522 // TODO: Handle GoAway from the client appropriately. 523 default: 524 if logger.V(logLevel) { 525 logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) 526 } 527 } 528 } 529} 530 531func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { 532 t.mu.Lock() 533 defer t.mu.Unlock() 534 if t.activeStreams == nil { 535 // The transport is closing. 536 return nil, false 537 } 538 s, ok := t.activeStreams[f.Header().StreamID] 539 if !ok { 540 // The stream is already done. 541 return nil, false 542 } 543 return s, true 544} 545 546// adjustWindow sends out extra window update over the initial window size 547// of stream if the application is requesting data larger in size than 548// the window. 549func (t *http2Server) adjustWindow(s *Stream, n uint32) { 550 if w := s.fc.maybeAdjust(n); w > 0 { 551 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 552 } 553 554} 555 556// updateWindow adjusts the inbound quota for the stream and the transport. 557// Window updates will deliver to the controller for sending when 558// the cumulative quota exceeds the corresponding threshold. 559func (t *http2Server) updateWindow(s *Stream, n uint32) { 560 if w := s.fc.onRead(n); w > 0 { 561 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, 562 increment: w, 563 }) 564 } 565} 566 567// updateFlowControl updates the incoming flow control windows 568// for the transport and the stream based on the current bdp 569// estimation. 570func (t *http2Server) updateFlowControl(n uint32) { 571 t.mu.Lock() 572 for _, s := range t.activeStreams { 573 s.fc.newLimit(n) 574 } 575 t.initialWindowSize = int32(n) 576 t.mu.Unlock() 577 t.controlBuf.put(&outgoingWindowUpdate{ 578 streamID: 0, 579 increment: t.fc.newLimit(n), 580 }) 581 t.controlBuf.put(&outgoingSettings{ 582 ss: []http2.Setting{ 583 { 584 ID: http2.SettingInitialWindowSize, 585 Val: n, 586 }, 587 }, 588 }) 589 590} 591 592func (t *http2Server) handleData(f *http2.DataFrame) { 593 size := f.Header().Length 594 var sendBDPPing bool 595 if t.bdpEst != nil { 596 sendBDPPing = t.bdpEst.add(size) 597 } 598 // Decouple connection's flow control from application's read. 599 // An update on connection's flow control should not depend on 600 // whether user application has read the data or not. Such a 601 // restriction is already imposed on the stream's flow control, 602 // and therefore the sender will be blocked anyways. 603 // Decoupling the connection flow control will prevent other 604 // active(fast) streams from starving in presence of slow or 605 // inactive streams. 606 if w := t.fc.onData(size); w > 0 { 607 t.controlBuf.put(&outgoingWindowUpdate{ 608 streamID: 0, 609 increment: w, 610 }) 611 } 612 if sendBDPPing { 613 // Avoid excessive ping detection (e.g. in an L7 proxy) 614 // by sending a window update prior to the BDP ping. 615 if w := t.fc.reset(); w > 0 { 616 t.controlBuf.put(&outgoingWindowUpdate{ 617 streamID: 0, 618 increment: w, 619 }) 620 } 621 t.controlBuf.put(bdpPing) 622 } 623 // Select the right stream to dispatch. 624 s, ok := t.getStream(f) 625 if !ok { 626 return 627 } 628 if s.getState() == streamReadDone { 629 t.closeStream(s, true, http2.ErrCodeStreamClosed, false) 630 return 631 } 632 if size > 0 { 633 if err := s.fc.onData(size); err != nil { 634 t.closeStream(s, true, http2.ErrCodeFlowControl, false) 635 return 636 } 637 if f.Header().Flags.Has(http2.FlagDataPadded) { 638 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 639 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 640 } 641 } 642 // TODO(bradfitz, zhaoq): A copy is required here because there is no 643 // guarantee f.Data() is consumed before the arrival of next frame. 644 // Can this copy be eliminated? 645 if len(f.Data()) > 0 { 646 buffer := t.bufferPool.get() 647 buffer.Reset() 648 buffer.Write(f.Data()) 649 s.write(recvMsg{buffer: buffer}) 650 } 651 } 652 if f.Header().Flags.Has(http2.FlagDataEndStream) { 653 // Received the end of stream from the client. 654 s.compareAndSwapState(streamActive, streamReadDone) 655 s.write(recvMsg{err: io.EOF}) 656 } 657} 658 659func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { 660 // If the stream is not deleted from the transport's active streams map, then do a regular close stream. 661 if s, ok := t.getStream(f); ok { 662 t.closeStream(s, false, 0, false) 663 return 664 } 665 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map. 666 t.controlBuf.put(&cleanupStream{ 667 streamID: f.Header().StreamID, 668 rst: false, 669 rstCode: 0, 670 onWrite: func() {}, 671 }) 672} 673 674func (t *http2Server) handleSettings(f *http2.SettingsFrame) { 675 if f.IsAck() { 676 return 677 } 678 var ss []http2.Setting 679 var updateFuncs []func() 680 f.ForeachSetting(func(s http2.Setting) error { 681 switch s.ID { 682 case http2.SettingMaxHeaderListSize: 683 updateFuncs = append(updateFuncs, func() { 684 t.maxSendHeaderListSize = new(uint32) 685 *t.maxSendHeaderListSize = s.Val 686 }) 687 default: 688 ss = append(ss, s) 689 } 690 return nil 691 }) 692 t.controlBuf.executeAndPut(func(interface{}) bool { 693 for _, f := range updateFuncs { 694 f() 695 } 696 return true 697 }, &incomingSettings{ 698 ss: ss, 699 }) 700} 701 702const ( 703 maxPingStrikes = 2 704 defaultPingTimeout = 2 * time.Hour 705) 706 707func (t *http2Server) handlePing(f *http2.PingFrame) { 708 if f.IsAck() { 709 if f.Data == goAwayPing.data && t.drainChan != nil { 710 close(t.drainChan) 711 return 712 } 713 // Maybe it's a BDP ping. 714 if t.bdpEst != nil { 715 t.bdpEst.calculate(f.Data) 716 } 717 return 718 } 719 pingAck := &ping{ack: true} 720 copy(pingAck.data[:], f.Data[:]) 721 t.controlBuf.put(pingAck) 722 723 now := time.Now() 724 defer func() { 725 t.lastPingAt = now 726 }() 727 // A reset ping strikes means that we don't need to check for policy 728 // violation for this ping and the pingStrikes counter should be set 729 // to 0. 730 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { 731 t.pingStrikes = 0 732 return 733 } 734 t.mu.Lock() 735 ns := len(t.activeStreams) 736 t.mu.Unlock() 737 if ns < 1 && !t.kep.PermitWithoutStream { 738 // Keepalive shouldn't be active thus, this new ping should 739 // have come after at least defaultPingTimeout. 740 if t.lastPingAt.Add(defaultPingTimeout).After(now) { 741 t.pingStrikes++ 742 } 743 } else { 744 // Check if keepalive policy is respected. 745 if t.lastPingAt.Add(t.kep.MinTime).After(now) { 746 t.pingStrikes++ 747 } 748 } 749 750 if t.pingStrikes > maxPingStrikes { 751 // Send goaway and close the connection. 752 if logger.V(logLevel) { 753 logger.Errorf("transport: Got too many pings from the client, closing the connection.") 754 } 755 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) 756 } 757} 758 759func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { 760 t.controlBuf.put(&incomingWindowUpdate{ 761 streamID: f.Header().StreamID, 762 increment: f.Increment, 763 }) 764} 765 766func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { 767 for k, vv := range md { 768 if isReservedHeader(k) { 769 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 770 continue 771 } 772 for _, v := range vv { 773 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 774 } 775 } 776 return headerFields 777} 778 779func (t *http2Server) checkForHeaderListSize(it interface{}) bool { 780 if t.maxSendHeaderListSize == nil { 781 return true 782 } 783 hdrFrame := it.(*headerFrame) 784 var sz int64 785 for _, f := range hdrFrame.hf { 786 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { 787 if logger.V(logLevel) { 788 logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) 789 } 790 return false 791 } 792 } 793 return true 794} 795 796// WriteHeader sends the header metadata md back to the client. 797func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { 798 if s.updateHeaderSent() || s.getState() == streamDone { 799 return ErrIllegalHeaderWrite 800 } 801 s.hdrMu.Lock() 802 if md.Len() > 0 { 803 if s.header.Len() > 0 { 804 s.header = metadata.Join(s.header, md) 805 } else { 806 s.header = md 807 } 808 } 809 if err := t.writeHeaderLocked(s); err != nil { 810 s.hdrMu.Unlock() 811 return err 812 } 813 s.hdrMu.Unlock() 814 return nil 815} 816 817func (t *http2Server) setResetPingStrikes() { 818 atomic.StoreUint32(&t.resetPingStrikes, 1) 819} 820 821func (t *http2Server) writeHeaderLocked(s *Stream) error { 822 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 823 // first and create a slice of that exact size. 824 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. 825 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 826 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) 827 if s.sendCompress != "" { 828 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 829 } 830 headerFields = appendHeaderFieldsFromMD(headerFields, s.header) 831 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ 832 streamID: s.id, 833 hf: headerFields, 834 endStream: false, 835 onWrite: t.setResetPingStrikes, 836 }) 837 if !success { 838 if err != nil { 839 return err 840 } 841 t.closeStream(s, true, http2.ErrCodeInternal, false) 842 return ErrHeaderListSizeLimitViolation 843 } 844 if t.stats != nil { 845 // Note: Headers are compressed with hpack after this call returns. 846 // No WireLength field is set here. 847 outHeader := &stats.OutHeader{ 848 Header: s.header.Copy(), 849 Compression: s.sendCompress, 850 } 851 t.stats.HandleRPC(s.Context(), outHeader) 852 } 853 return nil 854} 855 856// WriteStatus sends stream status to the client and terminates the stream. 857// There is no further I/O operations being able to perform on this stream. 858// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early 859// OK is adopted. 860func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { 861 if s.getState() == streamDone { 862 return nil 863 } 864 s.hdrMu.Lock() 865 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 866 // first and create a slice of that exact size. 867 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. 868 if !s.updateHeaderSent() { // No headers have been sent. 869 if len(s.header) > 0 { // Send a separate header frame. 870 if err := t.writeHeaderLocked(s); err != nil { 871 s.hdrMu.Unlock() 872 return err 873 } 874 } else { // Send a trailer only response. 875 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) 876 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) 877 } 878 } 879 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) 880 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) 881 882 if p := st.Proto(); p != nil && len(p.Details) > 0 { 883 stBytes, err := proto.Marshal(p) 884 if err != nil { 885 // TODO: return error instead, when callers are able to handle it. 886 logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) 887 } else { 888 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) 889 } 890 } 891 892 // Attach the trailer metadata. 893 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) 894 trailingHeader := &headerFrame{ 895 streamID: s.id, 896 hf: headerFields, 897 endStream: true, 898 onWrite: t.setResetPingStrikes, 899 } 900 s.hdrMu.Unlock() 901 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) 902 if !success { 903 if err != nil { 904 return err 905 } 906 t.closeStream(s, true, http2.ErrCodeInternal, false) 907 return ErrHeaderListSizeLimitViolation 908 } 909 // Send a RST_STREAM after the trailers if the client has not already half-closed. 910 rst := s.getState() == streamActive 911 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) 912 if t.stats != nil { 913 // Note: The trailer fields are compressed with hpack after this call returns. 914 // No WireLength field is set here. 915 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ 916 Trailer: s.trailer.Copy(), 917 }) 918 } 919 return nil 920} 921 922// Write converts the data into HTTP2 data frame and sends it out. Non-nil error 923// is returns if it fails (e.g., framing error, transport error). 924func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 925 if !s.isHeaderSent() { // Headers haven't been written yet. 926 if err := t.WriteHeader(s, nil); err != nil { 927 if _, ok := err.(ConnectionError); ok { 928 return err 929 } 930 // TODO(mmukhi, dfawley): Make sure this is the right code to return. 931 return status.Errorf(codes.Internal, "transport: %v", err) 932 } 933 } else { 934 // Writing headers checks for this condition. 935 if s.getState() == streamDone { 936 // TODO(mmukhi, dfawley): Should the server write also return io.EOF? 937 s.cancel() 938 select { 939 case <-t.done: 940 return ErrConnClosing 941 default: 942 } 943 return ContextErr(s.ctx.Err()) 944 } 945 } 946 df := &dataFrame{ 947 streamID: s.id, 948 h: hdr, 949 d: data, 950 onEachWrite: t.setResetPingStrikes, 951 } 952 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 953 select { 954 case <-t.done: 955 return ErrConnClosing 956 default: 957 } 958 return ContextErr(s.ctx.Err()) 959 } 960 return t.controlBuf.put(df) 961} 962 963// keepalive running in a separate goroutine does the following: 964// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. 965// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. 966// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. 967// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection 968// after an additional duration of keepalive.Timeout. 969func (t *http2Server) keepalive() { 970 p := &ping{} 971 // True iff a ping has been sent, and no data has been received since then. 972 outstandingPing := false 973 // Amount of time remaining before which we should receive an ACK for the 974 // last sent ping. 975 kpTimeoutLeft := time.Duration(0) 976 // Records the last value of t.lastRead before we go block on the timer. 977 // This is required to check for read activity since then. 978 prevNano := time.Now().UnixNano() 979 // Initialize the different timers to their default values. 980 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) 981 ageTimer := time.NewTimer(t.kp.MaxConnectionAge) 982 kpTimer := time.NewTimer(t.kp.Time) 983 defer func() { 984 // We need to drain the underlying channel in these timers after a call 985 // to Stop(), only if we are interested in resetting them. Clearly we 986 // are not interested in resetting them here. 987 idleTimer.Stop() 988 ageTimer.Stop() 989 kpTimer.Stop() 990 }() 991 992 for { 993 select { 994 case <-idleTimer.C: 995 t.mu.Lock() 996 idle := t.idle 997 if idle.IsZero() { // The connection is non-idle. 998 t.mu.Unlock() 999 idleTimer.Reset(t.kp.MaxConnectionIdle) 1000 continue 1001 } 1002 val := t.kp.MaxConnectionIdle - time.Since(idle) 1003 t.mu.Unlock() 1004 if val <= 0 { 1005 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. 1006 // Gracefully close the connection. 1007 t.drain(http2.ErrCodeNo, []byte{}) 1008 return 1009 } 1010 idleTimer.Reset(val) 1011 case <-ageTimer.C: 1012 t.drain(http2.ErrCodeNo, []byte{}) 1013 ageTimer.Reset(t.kp.MaxConnectionAgeGrace) 1014 select { 1015 case <-ageTimer.C: 1016 // Close the connection after grace period. 1017 if logger.V(logLevel) { 1018 logger.Infof("transport: closing server transport due to maximum connection age.") 1019 } 1020 t.Close() 1021 case <-t.done: 1022 } 1023 return 1024 case <-kpTimer.C: 1025 lastRead := atomic.LoadInt64(&t.lastRead) 1026 if lastRead > prevNano { 1027 // There has been read activity since the last time we were 1028 // here. Setup the timer to fire at kp.Time seconds from 1029 // lastRead time and continue. 1030 outstandingPing = false 1031 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) 1032 prevNano = lastRead 1033 continue 1034 } 1035 if outstandingPing && kpTimeoutLeft <= 0 { 1036 if logger.V(logLevel) { 1037 logger.Infof("transport: closing server transport due to idleness.") 1038 } 1039 t.Close() 1040 return 1041 } 1042 if !outstandingPing { 1043 if channelz.IsOn() { 1044 atomic.AddInt64(&t.czData.kpCount, 1) 1045 } 1046 t.controlBuf.put(p) 1047 kpTimeoutLeft = t.kp.Timeout 1048 outstandingPing = true 1049 } 1050 // The amount of time to sleep here is the minimum of kp.Time and 1051 // timeoutLeft. This will ensure that we wait only for kp.Time 1052 // before sending out the next ping (for cases where the ping is 1053 // acked). 1054 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) 1055 kpTimeoutLeft -= sleepDuration 1056 kpTimer.Reset(sleepDuration) 1057 case <-t.done: 1058 return 1059 } 1060 } 1061} 1062 1063// Close starts shutting down the http2Server transport. 1064// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This 1065// could cause some resource issue. Revisit this later. 1066func (t *http2Server) Close() error { 1067 t.mu.Lock() 1068 if t.state == closing { 1069 t.mu.Unlock() 1070 return errors.New("transport: Close() was already called") 1071 } 1072 t.state = closing 1073 streams := t.activeStreams 1074 t.activeStreams = nil 1075 t.mu.Unlock() 1076 t.controlBuf.finish() 1077 close(t.done) 1078 err := t.conn.Close() 1079 if channelz.IsOn() { 1080 channelz.RemoveEntry(t.channelzID) 1081 } 1082 // Cancel all active streams. 1083 for _, s := range streams { 1084 s.cancel() 1085 } 1086 if t.stats != nil { 1087 connEnd := &stats.ConnEnd{} 1088 t.stats.HandleConn(t.ctx, connEnd) 1089 } 1090 return err 1091} 1092 1093// deleteStream deletes the stream s from transport's active streams. 1094func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { 1095 // In case stream sending and receiving are invoked in separate 1096 // goroutines (e.g., bi-directional streaming), cancel needs to be 1097 // called to interrupt the potential blocking on other goroutines. 1098 s.cancel() 1099 1100 t.mu.Lock() 1101 if _, ok := t.activeStreams[s.id]; ok { 1102 delete(t.activeStreams, s.id) 1103 if len(t.activeStreams) == 0 { 1104 t.idle = time.Now() 1105 } 1106 } 1107 t.mu.Unlock() 1108 1109 if channelz.IsOn() { 1110 if eosReceived { 1111 atomic.AddInt64(&t.czData.streamsSucceeded, 1) 1112 } else { 1113 atomic.AddInt64(&t.czData.streamsFailed, 1) 1114 } 1115 } 1116} 1117 1118// finishStream closes the stream and puts the trailing headerFrame into controlbuf. 1119func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { 1120 oldState := s.swapState(streamDone) 1121 if oldState == streamDone { 1122 // If the stream was already done, return. 1123 return 1124 } 1125 1126 hdr.cleanup = &cleanupStream{ 1127 streamID: s.id, 1128 rst: rst, 1129 rstCode: rstCode, 1130 onWrite: func() { 1131 t.deleteStream(s, eosReceived) 1132 }, 1133 } 1134 t.controlBuf.put(hdr) 1135} 1136 1137// closeStream clears the footprint of a stream when the stream is not needed any more. 1138func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { 1139 s.swapState(streamDone) 1140 t.deleteStream(s, eosReceived) 1141 1142 t.controlBuf.put(&cleanupStream{ 1143 streamID: s.id, 1144 rst: rst, 1145 rstCode: rstCode, 1146 onWrite: func() {}, 1147 }) 1148} 1149 1150func (t *http2Server) RemoteAddr() net.Addr { 1151 return t.remoteAddr 1152} 1153 1154func (t *http2Server) Drain() { 1155 t.drain(http2.ErrCodeNo, []byte{}) 1156} 1157 1158func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { 1159 t.mu.Lock() 1160 defer t.mu.Unlock() 1161 if t.drainChan != nil { 1162 return 1163 } 1164 t.drainChan = make(chan struct{}) 1165 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) 1166} 1167 1168var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} 1169 1170// Handles outgoing GoAway and returns true if loopy needs to put itself 1171// in draining mode. 1172func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { 1173 t.mu.Lock() 1174 if t.state == closing { // TODO(mmukhi): This seems unnecessary. 1175 t.mu.Unlock() 1176 // The transport is closing. 1177 return false, ErrConnClosing 1178 } 1179 sid := t.maxStreamID 1180 if !g.headsUp { 1181 // Stop accepting more streams now. 1182 t.state = draining 1183 if len(t.activeStreams) == 0 { 1184 g.closeConn = true 1185 } 1186 t.mu.Unlock() 1187 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { 1188 return false, err 1189 } 1190 if g.closeConn { 1191 // Abruptly close the connection following the GoAway (via 1192 // loopywriter). But flush out what's inside the buffer first. 1193 t.framer.writer.Flush() 1194 return false, fmt.Errorf("transport: Connection closing") 1195 } 1196 return true, nil 1197 } 1198 t.mu.Unlock() 1199 // For a graceful close, send out a GoAway with stream ID of MaxUInt32, 1200 // Follow that with a ping and wait for the ack to come back or a timer 1201 // to expire. During this time accept new streams since they might have 1202 // originated before the GoAway reaches the client. 1203 // After getting the ack or timer expiration send out another GoAway this 1204 // time with an ID of the max stream server intends to process. 1205 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { 1206 return false, err 1207 } 1208 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { 1209 return false, err 1210 } 1211 go func() { 1212 timer := time.NewTimer(time.Minute) 1213 defer timer.Stop() 1214 select { 1215 case <-t.drainChan: 1216 case <-timer.C: 1217 case <-t.done: 1218 return 1219 } 1220 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) 1221 }() 1222 return false, nil 1223} 1224 1225func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { 1226 s := channelz.SocketInternalMetric{ 1227 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), 1228 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), 1229 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), 1230 MessagesSent: atomic.LoadInt64(&t.czData.msgSent), 1231 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), 1232 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), 1233 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), 1234 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), 1235 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), 1236 LocalFlowControlWindow: int64(t.fc.getSize()), 1237 SocketOptions: channelz.GetSocketOption(t.conn), 1238 LocalAddr: t.localAddr, 1239 RemoteAddr: t.remoteAddr, 1240 // RemoteName : 1241 } 1242 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1243 s.Security = au.GetSecurityValue() 1244 } 1245 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1246 return &s 1247} 1248 1249func (t *http2Server) IncrMsgSent() { 1250 atomic.AddInt64(&t.czData.msgSent, 1) 1251 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) 1252} 1253 1254func (t *http2Server) IncrMsgRecv() { 1255 atomic.AddInt64(&t.czData.msgRecv, 1) 1256 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) 1257} 1258 1259func (t *http2Server) getOutFlowWindow() int64 { 1260 resp := make(chan uint32, 1) 1261 timer := time.NewTimer(time.Second) 1262 defer timer.Stop() 1263 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1264 select { 1265 case sz := <-resp: 1266 return int64(sz) 1267 case <-t.done: 1268 return -1 1269 case <-timer.C: 1270 return -2 1271 } 1272} 1273 1274func getJitter(v time.Duration) time.Duration { 1275 if v == infinity { 1276 return 0 1277 } 1278 // Generate a jitter between +/- 10% of the value. 1279 r := int64(v / 10) 1280 j := grpcrand.Int63n(2*r) - r 1281 return time.Duration(j) 1282} 1283