1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "errors" 24 "io" 25 "math" 26 "math/rand" 27 "net" 28 "strconv" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "github.com/golang/protobuf/proto" 34 "golang.org/x/net/context" 35 "golang.org/x/net/http2" 36 "golang.org/x/net/http2/hpack" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/credentials" 39 "google.golang.org/grpc/keepalive" 40 "google.golang.org/grpc/metadata" 41 "google.golang.org/grpc/peer" 42 "google.golang.org/grpc/stats" 43 "google.golang.org/grpc/status" 44 "google.golang.org/grpc/tap" 45) 46 47// ErrIllegalHeaderWrite indicates that setting header is illegal because of 48// the stream's state. 49var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") 50 51// http2Server implements the ServerTransport interface with HTTP2. 52type http2Server struct { 53 ctx context.Context 54 conn net.Conn 55 remoteAddr net.Addr 56 localAddr net.Addr 57 maxStreamID uint32 // max stream ID ever seen 58 authInfo credentials.AuthInfo // auth info about the connection 59 inTapHandle tap.ServerInHandle 60 // writableChan synchronizes write access to the transport. 61 // A writer acquires the write lock by receiving a value on writableChan 62 // and releases it by sending on writableChan. 63 writableChan chan int 64 // shutdownChan is closed when Close is called. 65 // Blocking operations should select on shutdownChan to avoid 66 // blocking forever after Close. 67 shutdownChan chan struct{} 68 framer *framer 69 hBuf *bytes.Buffer // the buffer for HPACK encoding 70 hEnc *hpack.Encoder // HPACK encoder 71 // The max number of concurrent streams. 72 maxStreams uint32 73 // controlBuf delivers all the control related tasks (e.g., window 74 // updates, reset streams, and various settings) to the controller. 75 controlBuf *controlBuffer 76 fc *inFlow 77 // sendQuotaPool provides flow control to outbound message. 78 sendQuotaPool *quotaPool 79 stats stats.Handler 80 // Flag to keep track of reading activity on transport. 81 // 1 is true and 0 is false. 82 activity uint32 // Accessed atomically. 83 // Keepalive and max-age parameters for the server. 84 kp keepalive.ServerParameters 85 86 // Keepalive enforcement policy. 87 kep keepalive.EnforcementPolicy 88 // The time instance last ping was received. 89 lastPingAt time.Time 90 // Number of times the client has violated keepalive ping policy so far. 91 pingStrikes uint8 92 // Flag to signify that number of ping strikes should be reset to 0. 93 // This is set whenever data or header frames are sent. 94 // 1 means yes. 95 resetPingStrikes uint32 // Accessed atomically. 96 initialWindowSize int32 97 bdpEst *bdpEstimator 98 99 outQuotaVersion uint32 100 101 mu sync.Mutex // guard the following 102 103 // drainChan is initialized when drain(...) is called the first time. 104 // After which the server writes out the first GoAway(with ID 2^31-1) frame. 105 // Then an independent goroutine will be launched to later send the second GoAway. 106 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. 107 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is 108 // already underway. 109 drainChan chan struct{} 110 state transportState 111 activeStreams map[uint32]*Stream 112 // the per-stream outbound flow control window size set by the peer. 113 streamSendQuota uint32 114 // idle is the time instant when the connection went idle. 115 // This is either the begining of the connection or when the number of 116 // RPCs go down to 0. 117 // When the connection is busy, this value is set to 0. 118 idle time.Time 119} 120 121// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is 122// returned if something goes wrong. 123func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { 124 framer := newFramer(conn) 125 // Send initial settings as connection preface to client. 126 var isettings []http2.Setting 127 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is 128 // permitted in the HTTP2 spec. 129 maxStreams := config.MaxStreams 130 if maxStreams == 0 { 131 maxStreams = math.MaxUint32 132 } else { 133 isettings = append(isettings, http2.Setting{ 134 ID: http2.SettingMaxConcurrentStreams, 135 Val: maxStreams, 136 }) 137 } 138 dynamicWindow := true 139 iwz := int32(initialWindowSize) 140 if config.InitialWindowSize >= defaultWindowSize { 141 iwz = config.InitialWindowSize 142 dynamicWindow = false 143 } 144 icwz := int32(initialWindowSize) 145 if config.InitialConnWindowSize >= defaultWindowSize { 146 icwz = config.InitialConnWindowSize 147 dynamicWindow = false 148 } 149 if iwz != defaultWindowSize { 150 isettings = append(isettings, http2.Setting{ 151 ID: http2.SettingInitialWindowSize, 152 Val: uint32(iwz)}) 153 } 154 if err := framer.writeSettings(true, isettings...); err != nil { 155 return nil, connectionErrorf(true, err, "transport: %v", err) 156 } 157 // Adjust the connection flow control window if needed. 158 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 159 if err := framer.writeWindowUpdate(true, 0, delta); err != nil { 160 return nil, connectionErrorf(true, err, "transport: %v", err) 161 } 162 } 163 kp := config.KeepaliveParams 164 if kp.MaxConnectionIdle == 0 { 165 kp.MaxConnectionIdle = defaultMaxConnectionIdle 166 } 167 if kp.MaxConnectionAge == 0 { 168 kp.MaxConnectionAge = defaultMaxConnectionAge 169 } 170 // Add a jitter to MaxConnectionAge. 171 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) 172 if kp.MaxConnectionAgeGrace == 0 { 173 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace 174 } 175 if kp.Time == 0 { 176 kp.Time = defaultServerKeepaliveTime 177 } 178 if kp.Timeout == 0 { 179 kp.Timeout = defaultServerKeepaliveTimeout 180 } 181 kep := config.KeepalivePolicy 182 if kep.MinTime == 0 { 183 kep.MinTime = defaultKeepalivePolicyMinTime 184 } 185 var buf bytes.Buffer 186 t := &http2Server{ 187 ctx: context.Background(), 188 conn: conn, 189 remoteAddr: conn.RemoteAddr(), 190 localAddr: conn.LocalAddr(), 191 authInfo: config.AuthInfo, 192 framer: framer, 193 hBuf: &buf, 194 hEnc: hpack.NewEncoder(&buf), 195 maxStreams: maxStreams, 196 inTapHandle: config.InTapHandle, 197 controlBuf: newControlBuffer(), 198 fc: &inFlow{limit: uint32(icwz)}, 199 sendQuotaPool: newQuotaPool(defaultWindowSize), 200 state: reachable, 201 writableChan: make(chan int, 1), 202 shutdownChan: make(chan struct{}), 203 activeStreams: make(map[uint32]*Stream), 204 streamSendQuota: defaultWindowSize, 205 stats: config.StatsHandler, 206 kp: kp, 207 idle: time.Now(), 208 kep: kep, 209 initialWindowSize: iwz, 210 } 211 if dynamicWindow { 212 t.bdpEst = &bdpEstimator{ 213 bdp: initialWindowSize, 214 updateFlowControl: t.updateFlowControl, 215 } 216 } 217 if t.stats != nil { 218 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ 219 RemoteAddr: t.remoteAddr, 220 LocalAddr: t.localAddr, 221 }) 222 connBegin := &stats.ConnBegin{} 223 t.stats.HandleConn(t.ctx, connBegin) 224 } 225 go t.controller() 226 go t.keepalive() 227 t.writableChan <- 0 228 return t, nil 229} 230 231// operateHeader takes action on the decoded headers. 232func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) { 233 streamID := frame.Header().StreamID 234 235 var state decodeState 236 for _, hf := range frame.Fields { 237 if err := state.processHeaderField(hf); err != nil { 238 if se, ok := err.(StreamError); ok { 239 t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]}) 240 } 241 return 242 } 243 } 244 245 buf := newRecvBuffer() 246 s := &Stream{ 247 id: streamID, 248 st: t, 249 buf: buf, 250 fc: &inFlow{limit: uint32(t.initialWindowSize)}, 251 recvCompress: state.encoding, 252 method: state.method, 253 } 254 255 if frame.StreamEnded() { 256 // s is just created by the caller. No lock needed. 257 s.state = streamReadDone 258 } 259 if state.timeoutSet { 260 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) 261 } else { 262 s.ctx, s.cancel = context.WithCancel(t.ctx) 263 } 264 pr := &peer.Peer{ 265 Addr: t.remoteAddr, 266 } 267 // Attach Auth info if there is any. 268 if t.authInfo != nil { 269 pr.AuthInfo = t.authInfo 270 } 271 s.ctx = peer.NewContext(s.ctx, pr) 272 // Cache the current stream to the context so that the server application 273 // can find out. Required when the server wants to send some metadata 274 // back to the client (unary call only). 275 s.ctx = newContextWithStream(s.ctx, s) 276 // Attach the received metadata to the context. 277 if len(state.mdata) > 0 { 278 s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) 279 } 280 if state.statsTags != nil { 281 s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) 282 } 283 if state.statsTrace != nil { 284 s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) 285 } 286 if t.inTapHandle != nil { 287 var err error 288 info := &tap.Info{ 289 FullMethodName: state.method, 290 } 291 s.ctx, err = t.inTapHandle(s.ctx, info) 292 if err != nil { 293 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) 294 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) 295 return 296 } 297 } 298 t.mu.Lock() 299 if t.state != reachable { 300 t.mu.Unlock() 301 return 302 } 303 if uint32(len(t.activeStreams)) >= t.maxStreams { 304 t.mu.Unlock() 305 t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream}) 306 return 307 } 308 if streamID%2 != 1 || streamID <= t.maxStreamID { 309 t.mu.Unlock() 310 // illegal gRPC stream id. 311 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) 312 return true 313 } 314 t.maxStreamID = streamID 315 s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) 316 t.activeStreams[streamID] = s 317 if len(t.activeStreams) == 1 { 318 t.idle = time.Time{} 319 } 320 t.mu.Unlock() 321 s.requestRead = func(n int) { 322 t.adjustWindow(s, uint32(n)) 323 } 324 s.ctx = traceCtx(s.ctx, s.method) 325 if t.stats != nil { 326 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) 327 inHeader := &stats.InHeader{ 328 FullMethod: s.method, 329 RemoteAddr: t.remoteAddr, 330 LocalAddr: t.localAddr, 331 Compression: s.recvCompress, 332 WireLength: int(frame.Header().Length), 333 } 334 t.stats.HandleRPC(s.ctx, inHeader) 335 } 336 s.trReader = &transportReader{ 337 reader: &recvBufferReader{ 338 ctx: s.ctx, 339 recv: s.buf, 340 }, 341 windowHandler: func(n int) { 342 t.updateWindow(s, uint32(n)) 343 }, 344 } 345 handle(s) 346 return 347} 348 349// HandleStreams receives incoming streams using the given handler. This is 350// typically run in a separate goroutine. 351// traceCtx attaches trace to ctx and returns the new context. 352func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { 353 // Check the validity of client preface. 354 preface := make([]byte, len(clientPreface)) 355 if _, err := io.ReadFull(t.conn, preface); err != nil { 356 // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket) 357 if err != io.EOF { 358 errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) 359 } 360 t.Close() 361 return 362 } 363 if !bytes.Equal(preface, clientPreface) { 364 errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) 365 t.Close() 366 return 367 } 368 369 frame, err := t.framer.readFrame() 370 if err == io.EOF || err == io.ErrUnexpectedEOF { 371 t.Close() 372 return 373 } 374 if err != nil { 375 errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) 376 t.Close() 377 return 378 } 379 atomic.StoreUint32(&t.activity, 1) 380 sf, ok := frame.(*http2.SettingsFrame) 381 if !ok { 382 errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) 383 t.Close() 384 return 385 } 386 t.handleSettings(sf) 387 388 for { 389 frame, err := t.framer.readFrame() 390 atomic.StoreUint32(&t.activity, 1) 391 if err != nil { 392 if se, ok := err.(http2.StreamError); ok { 393 t.mu.Lock() 394 s := t.activeStreams[se.StreamID] 395 t.mu.Unlock() 396 if s != nil { 397 t.closeStream(s) 398 } 399 t.controlBuf.put(&resetStream{se.StreamID, se.Code}) 400 continue 401 } 402 if err == io.EOF || err == io.ErrUnexpectedEOF { 403 t.Close() 404 return 405 } 406 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) 407 t.Close() 408 return 409 } 410 switch frame := frame.(type) { 411 case *http2.MetaHeadersFrame: 412 if t.operateHeaders(frame, handle, traceCtx) { 413 t.Close() 414 break 415 } 416 case *http2.DataFrame: 417 t.handleData(frame) 418 case *http2.RSTStreamFrame: 419 t.handleRSTStream(frame) 420 case *http2.SettingsFrame: 421 t.handleSettings(frame) 422 case *http2.PingFrame: 423 t.handlePing(frame) 424 case *http2.WindowUpdateFrame: 425 t.handleWindowUpdate(frame) 426 case *http2.GoAwayFrame: 427 // TODO: Handle GoAway from the client appropriately. 428 default: 429 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) 430 } 431 } 432} 433 434func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { 435 t.mu.Lock() 436 defer t.mu.Unlock() 437 if t.activeStreams == nil { 438 // The transport is closing. 439 return nil, false 440 } 441 s, ok := t.activeStreams[f.Header().StreamID] 442 if !ok { 443 // The stream is already done. 444 return nil, false 445 } 446 return s, true 447} 448 449// adjustWindow sends out extra window update over the initial window size 450// of stream if the application is requesting data larger in size than 451// the window. 452func (t *http2Server) adjustWindow(s *Stream, n uint32) { 453 s.mu.Lock() 454 defer s.mu.Unlock() 455 if s.state == streamDone { 456 return 457 } 458 if w := s.fc.maybeAdjust(n); w > 0 { 459 if cw := t.fc.resetPendingUpdate(); cw > 0 { 460 t.controlBuf.put(&windowUpdate{0, cw, false}) 461 } 462 t.controlBuf.put(&windowUpdate{s.id, w, true}) 463 } 464} 465 466// updateWindow adjusts the inbound quota for the stream and the transport. 467// Window updates will deliver to the controller for sending when 468// the cumulative quota exceeds the corresponding threshold. 469func (t *http2Server) updateWindow(s *Stream, n uint32) { 470 s.mu.Lock() 471 defer s.mu.Unlock() 472 if s.state == streamDone { 473 return 474 } 475 if w := s.fc.onRead(n); w > 0 { 476 if cw := t.fc.resetPendingUpdate(); cw > 0 { 477 t.controlBuf.put(&windowUpdate{0, cw, false}) 478 } 479 t.controlBuf.put(&windowUpdate{s.id, w, true}) 480 } 481} 482 483// updateFlowControl updates the incoming flow control windows 484// for the transport and the stream based on the current bdp 485// estimation. 486func (t *http2Server) updateFlowControl(n uint32) { 487 t.mu.Lock() 488 for _, s := range t.activeStreams { 489 s.fc.newLimit(n) 490 } 491 t.initialWindowSize = int32(n) 492 t.mu.Unlock() 493 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) 494 t.controlBuf.put(&settings{ 495 ack: false, 496 ss: []http2.Setting{ 497 { 498 ID: http2.SettingInitialWindowSize, 499 Val: uint32(n), 500 }, 501 }, 502 }) 503 504} 505 506func (t *http2Server) handleData(f *http2.DataFrame) { 507 size := f.Header().Length 508 var sendBDPPing bool 509 if t.bdpEst != nil { 510 sendBDPPing = t.bdpEst.add(uint32(size)) 511 } 512 // Decouple connection's flow control from application's read. 513 // An update on connection's flow control should not depend on 514 // whether user application has read the data or not. Such a 515 // restriction is already imposed on the stream's flow control, 516 // and therefore the sender will be blocked anyways. 517 // Decoupling the connection flow control will prevent other 518 // active(fast) streams from starving in presence of slow or 519 // inactive streams. 520 // 521 // Furthermore, if a bdpPing is being sent out we can piggyback 522 // connection's window update for the bytes we just received. 523 if sendBDPPing { 524 t.controlBuf.put(&windowUpdate{0, uint32(size), false}) 525 t.controlBuf.put(bdpPing) 526 } else { 527 if err := t.fc.onData(uint32(size)); err != nil { 528 errorf("transport: http2Server %v", err) 529 t.Close() 530 return 531 } 532 if w := t.fc.onRead(uint32(size)); w > 0 { 533 t.controlBuf.put(&windowUpdate{0, w, true}) 534 } 535 } 536 // Select the right stream to dispatch. 537 s, ok := t.getStream(f) 538 if !ok { 539 return 540 } 541 if size > 0 { 542 s.mu.Lock() 543 if s.state == streamDone { 544 s.mu.Unlock() 545 return 546 } 547 if err := s.fc.onData(uint32(size)); err != nil { 548 s.mu.Unlock() 549 t.closeStream(s) 550 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) 551 return 552 } 553 if f.Header().Flags.Has(http2.FlagDataPadded) { 554 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { 555 t.controlBuf.put(&windowUpdate{s.id, w, true}) 556 } 557 } 558 s.mu.Unlock() 559 // TODO(bradfitz, zhaoq): A copy is required here because there is no 560 // guarantee f.Data() is consumed before the arrival of next frame. 561 // Can this copy be eliminated? 562 if len(f.Data()) > 0 { 563 data := make([]byte, len(f.Data())) 564 copy(data, f.Data()) 565 s.write(recvMsg{data: data}) 566 } 567 } 568 if f.Header().Flags.Has(http2.FlagDataEndStream) { 569 // Received the end of stream from the client. 570 s.mu.Lock() 571 if s.state != streamDone { 572 s.state = streamReadDone 573 } 574 s.mu.Unlock() 575 s.write(recvMsg{err: io.EOF}) 576 } 577} 578 579func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { 580 s, ok := t.getStream(f) 581 if !ok { 582 return 583 } 584 t.closeStream(s) 585} 586 587func (t *http2Server) handleSettings(f *http2.SettingsFrame) { 588 if f.IsAck() { 589 return 590 } 591 var ss []http2.Setting 592 f.ForeachSetting(func(s http2.Setting) error { 593 ss = append(ss, s) 594 return nil 595 }) 596 // The settings will be applied once the ack is sent. 597 t.controlBuf.put(&settings{ack: true, ss: ss}) 598} 599 600const ( 601 maxPingStrikes = 2 602 defaultPingTimeout = 2 * time.Hour 603) 604 605func (t *http2Server) handlePing(f *http2.PingFrame) { 606 if f.IsAck() { 607 if f.Data == goAwayPing.data && t.drainChan != nil { 608 close(t.drainChan) 609 return 610 } 611 // Maybe it's a BDP ping. 612 if t.bdpEst != nil { 613 t.bdpEst.calculate(f.Data) 614 } 615 return 616 } 617 pingAck := &ping{ack: true} 618 copy(pingAck.data[:], f.Data[:]) 619 t.controlBuf.put(pingAck) 620 621 now := time.Now() 622 defer func() { 623 t.lastPingAt = now 624 }() 625 // A reset ping strikes means that we don't need to check for policy 626 // violation for this ping and the pingStrikes counter should be set 627 // to 0. 628 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { 629 t.pingStrikes = 0 630 return 631 } 632 t.mu.Lock() 633 ns := len(t.activeStreams) 634 t.mu.Unlock() 635 if ns < 1 && !t.kep.PermitWithoutStream { 636 // Keepalive shouldn't be active thus, this new ping should 637 // have come after atleast defaultPingTimeout. 638 if t.lastPingAt.Add(defaultPingTimeout).After(now) { 639 t.pingStrikes++ 640 } 641 } else { 642 // Check if keepalive policy is respected. 643 if t.lastPingAt.Add(t.kep.MinTime).After(now) { 644 t.pingStrikes++ 645 } 646 } 647 648 if t.pingStrikes > maxPingStrikes { 649 // Send goaway and close the connection. 650 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) 651 } 652} 653 654func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { 655 id := f.Header().StreamID 656 incr := f.Increment 657 if id == 0 { 658 t.sendQuotaPool.add(int(incr)) 659 return 660 } 661 if s, ok := t.getStream(f); ok { 662 s.sendQuotaPool.add(int(incr)) 663 } 664} 665 666func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error { 667 first := true 668 endHeaders := false 669 var err error 670 defer func() { 671 if err == nil { 672 // Reset ping strikes when seding headers since that might cause the 673 // peer to send ping. 674 atomic.StoreUint32(&t.resetPingStrikes, 1) 675 } 676 }() 677 // Sends the headers in a single batch. 678 for !endHeaders { 679 size := t.hBuf.Len() 680 if size > http2MaxFrameLen { 681 size = http2MaxFrameLen 682 } else { 683 endHeaders = true 684 } 685 if first { 686 p := http2.HeadersFrameParam{ 687 StreamID: s.id, 688 BlockFragment: b.Next(size), 689 EndStream: endStream, 690 EndHeaders: endHeaders, 691 } 692 err = t.framer.writeHeaders(endHeaders, p) 693 first = false 694 } else { 695 err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size)) 696 } 697 if err != nil { 698 t.Close() 699 return connectionErrorf(true, err, "transport: %v", err) 700 } 701 } 702 return nil 703} 704 705// WriteHeader sends the header metedata md back to the client. 706func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { 707 s.mu.Lock() 708 if s.headerOk || s.state == streamDone { 709 s.mu.Unlock() 710 return ErrIllegalHeaderWrite 711 } 712 s.headerOk = true 713 if md.Len() > 0 { 714 if s.header.Len() > 0 { 715 s.header = metadata.Join(s.header, md) 716 } else { 717 s.header = md 718 } 719 } 720 md = s.header 721 s.mu.Unlock() 722 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { 723 return err 724 } 725 t.hBuf.Reset() 726 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) 727 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) 728 if s.sendCompress != "" { 729 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 730 } 731 for k, vv := range md { 732 if isReservedHeader(k) { 733 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 734 continue 735 } 736 for _, v := range vv { 737 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 738 } 739 } 740 bufLen := t.hBuf.Len() 741 if err := t.writeHeaders(s, t.hBuf, false); err != nil { 742 return err 743 } 744 if t.stats != nil { 745 outHeader := &stats.OutHeader{ 746 WireLength: bufLen, 747 } 748 t.stats.HandleRPC(s.Context(), outHeader) 749 } 750 t.writableChan <- 0 751 return nil 752} 753 754// WriteStatus sends stream status to the client and terminates the stream. 755// There is no further I/O operations being able to perform on this stream. 756// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early 757// OK is adopted. 758func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { 759 var headersSent, hasHeader bool 760 s.mu.Lock() 761 if s.state == streamDone { 762 s.mu.Unlock() 763 return nil 764 } 765 if s.headerOk { 766 headersSent = true 767 } 768 if s.header.Len() > 0 { 769 hasHeader = true 770 } 771 s.mu.Unlock() 772 773 if !headersSent && hasHeader { 774 t.WriteHeader(s, nil) 775 headersSent = true 776 } 777 778 // Always write a status regardless of context cancellation unless the stream 779 // is terminated (e.g. by a RST_STREAM, GOAWAY, or transport error). The 780 // server's application code is already done so it is fine to ignore s.ctx. 781 select { 782 case <-t.shutdownChan: 783 return ErrConnClosing 784 case <-t.writableChan: 785 } 786 t.hBuf.Reset() 787 if !headersSent { 788 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) 789 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) 790 } 791 t.hEnc.WriteField( 792 hpack.HeaderField{ 793 Name: "grpc-status", 794 Value: strconv.Itoa(int(st.Code())), 795 }) 796 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) 797 798 if p := st.Proto(); p != nil && len(p.Details) > 0 { 799 stBytes, err := proto.Marshal(p) 800 if err != nil { 801 // TODO: return error instead, when callers are able to handle it. 802 panic(err) 803 } 804 805 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) 806 } 807 808 // Attach the trailer metadata. 809 for k, vv := range s.trailer { 810 // Clients don't tolerate reading restricted headers after some non restricted ones were sent. 811 if isReservedHeader(k) { 812 continue 813 } 814 for _, v := range vv { 815 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 816 } 817 } 818 bufLen := t.hBuf.Len() 819 if err := t.writeHeaders(s, t.hBuf, true); err != nil { 820 t.Close() 821 return err 822 } 823 if t.stats != nil { 824 outTrailer := &stats.OutTrailer{ 825 WireLength: bufLen, 826 } 827 t.stats.HandleRPC(s.Context(), outTrailer) 828 } 829 t.closeStream(s) 830 t.writableChan <- 0 831 return nil 832} 833 834// Write converts the data into HTTP2 data frame and sends it out. Non-nil error 835// is returns if it fails (e.g., framing error, transport error). 836func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) { 837 // TODO(zhaoq): Support multi-writers for a single stream. 838 secondStart := http2MaxFrameLen - len(hdr)%http2MaxFrameLen 839 if len(data) < secondStart { 840 secondStart = len(data) 841 } 842 hdr = append(hdr, data[:secondStart]...) 843 data = data[secondStart:] 844 isLastSlice := (len(data) == 0) 845 var writeHeaderFrame bool 846 s.mu.Lock() 847 if s.state == streamDone { 848 s.mu.Unlock() 849 return streamErrorf(codes.Unknown, "the stream has been done") 850 } 851 if !s.headerOk { 852 writeHeaderFrame = true 853 } 854 s.mu.Unlock() 855 if writeHeaderFrame { 856 t.WriteHeader(s, nil) 857 } 858 r := bytes.NewBuffer(hdr) 859 var ( 860 p []byte 861 oqv uint32 862 ) 863 for { 864 if r.Len() == 0 && p == nil { 865 return nil 866 } 867 oqv = atomic.LoadUint32(&t.outQuotaVersion) 868 size := http2MaxFrameLen 869 // Wait until the stream has some quota to send the data. 870 sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) 871 if err != nil { 872 return err 873 } 874 // Wait until the transport has some quota to send the data. 875 tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) 876 if err != nil { 877 return err 878 } 879 if sq < size { 880 size = sq 881 } 882 if tq < size { 883 size = tq 884 } 885 if p == nil { 886 p = r.Next(size) 887 } 888 ps := len(p) 889 if ps < sq { 890 // Overbooked stream quota. Return it back. 891 s.sendQuotaPool.add(sq - ps) 892 } 893 if ps < tq { 894 // Overbooked transport quota. Return it back. 895 t.sendQuotaPool.add(tq - ps) 896 } 897 t.framer.adjustNumWriters(1) 898 // Got some quota. Try to acquire writing privilege on the 899 // transport. 900 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { 901 if _, ok := err.(StreamError); ok { 902 // Return the connection quota back. 903 t.sendQuotaPool.add(ps) 904 } 905 if t.framer.adjustNumWriters(-1) == 0 { 906 // This writer is the last one in this batch and has the 907 // responsibility to flush the buffered frames. It queues 908 // a flush request to controlBuf instead of flushing directly 909 // in order to avoid the race with other writing or flushing. 910 t.controlBuf.put(&flushIO{}) 911 } 912 return err 913 } 914 select { 915 case <-s.ctx.Done(): 916 t.sendQuotaPool.add(ps) 917 if t.framer.adjustNumWriters(-1) == 0 { 918 t.controlBuf.put(&flushIO{}) 919 } 920 t.writableChan <- 0 921 return ContextErr(s.ctx.Err()) 922 default: 923 } 924 if oqv != atomic.LoadUint32(&t.outQuotaVersion) { 925 // InitialWindowSize settings frame must have been received after we 926 // acquired send quota but before we got the writable channel. 927 // We must forsake this write. 928 t.sendQuotaPool.add(ps) 929 s.sendQuotaPool.add(ps) 930 if t.framer.adjustNumWriters(-1) == 0 { 931 t.controlBuf.put(&flushIO{}) 932 } 933 t.writableChan <- 0 934 continue 935 } 936 var forceFlush bool 937 if r.Len() == 0 { 938 if isLastSlice { 939 if t.framer.adjustNumWriters(0) == 1 && !opts.Last { 940 forceFlush = true 941 } 942 } else { 943 r = bytes.NewBuffer(data) 944 isLastSlice = true 945 } 946 } 947 // Reset ping strikes when sending data since this might cause 948 // the peer to send ping. 949 atomic.StoreUint32(&t.resetPingStrikes, 1) 950 if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { 951 t.Close() 952 return connectionErrorf(true, err, "transport: %v", err) 953 } 954 p = nil 955 if t.framer.adjustNumWriters(-1) == 0 { 956 t.framer.flushWrite() 957 } 958 t.writableChan <- 0 959 } 960 961} 962 963func (t *http2Server) applySettings(ss []http2.Setting) { 964 for _, s := range ss { 965 if s.ID == http2.SettingInitialWindowSize { 966 t.mu.Lock() 967 defer t.mu.Unlock() 968 for _, stream := range t.activeStreams { 969 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) 970 } 971 t.streamSendQuota = s.Val 972 atomic.AddUint32(&t.outQuotaVersion, 1) 973 } 974 975 } 976} 977 978// keepalive running in a separate goroutine does the following: 979// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. 980// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. 981// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. 982// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection 983// after an additional duration of keepalive.Timeout. 984func (t *http2Server) keepalive() { 985 p := &ping{} 986 var pingSent bool 987 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) 988 maxAge := time.NewTimer(t.kp.MaxConnectionAge) 989 keepalive := time.NewTimer(t.kp.Time) 990 // NOTE: All exit paths of this function should reset their 991 // respecitve timers. A failure to do so will cause the 992 // following clean-up to deadlock and eventually leak. 993 defer func() { 994 if !maxIdle.Stop() { 995 <-maxIdle.C 996 } 997 if !maxAge.Stop() { 998 <-maxAge.C 999 } 1000 if !keepalive.Stop() { 1001 <-keepalive.C 1002 } 1003 }() 1004 for { 1005 select { 1006 case <-maxIdle.C: 1007 t.mu.Lock() 1008 idle := t.idle 1009 if idle.IsZero() { // The connection is non-idle. 1010 t.mu.Unlock() 1011 maxIdle.Reset(t.kp.MaxConnectionIdle) 1012 continue 1013 } 1014 val := t.kp.MaxConnectionIdle - time.Since(idle) 1015 t.mu.Unlock() 1016 if val <= 0 { 1017 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. 1018 // Gracefully close the connection. 1019 t.drain(http2.ErrCodeNo, []byte{}) 1020 // Reseting the timer so that the clean-up doesn't deadlock. 1021 maxIdle.Reset(infinity) 1022 return 1023 } 1024 maxIdle.Reset(val) 1025 case <-maxAge.C: 1026 t.drain(http2.ErrCodeNo, []byte{}) 1027 maxAge.Reset(t.kp.MaxConnectionAgeGrace) 1028 select { 1029 case <-maxAge.C: 1030 // Close the connection after grace period. 1031 t.Close() 1032 // Reseting the timer so that the clean-up doesn't deadlock. 1033 maxAge.Reset(infinity) 1034 case <-t.shutdownChan: 1035 } 1036 return 1037 case <-keepalive.C: 1038 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1039 pingSent = false 1040 keepalive.Reset(t.kp.Time) 1041 continue 1042 } 1043 if pingSent { 1044 t.Close() 1045 // Reseting the timer so that the clean-up doesn't deadlock. 1046 keepalive.Reset(infinity) 1047 return 1048 } 1049 pingSent = true 1050 t.controlBuf.put(p) 1051 keepalive.Reset(t.kp.Timeout) 1052 case <-t.shutdownChan: 1053 return 1054 } 1055 } 1056} 1057 1058var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} 1059 1060// controller running in a separate goroutine takes charge of sending control 1061// frames (e.g., window update, reset stream, setting, etc.) to the server. 1062func (t *http2Server) controller() { 1063 for { 1064 select { 1065 case i := <-t.controlBuf.get(): 1066 t.controlBuf.load() 1067 select { 1068 case <-t.writableChan: 1069 switch i := i.(type) { 1070 case *windowUpdate: 1071 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) 1072 case *settings: 1073 if i.ack { 1074 t.framer.writeSettingsAck(true) 1075 t.applySettings(i.ss) 1076 } else { 1077 t.framer.writeSettings(true, i.ss...) 1078 } 1079 case *resetStream: 1080 t.framer.writeRSTStream(true, i.streamID, i.code) 1081 case *goAway: 1082 t.mu.Lock() 1083 if t.state == closing { 1084 t.mu.Unlock() 1085 // The transport is closing. 1086 return 1087 } 1088 sid := t.maxStreamID 1089 if !i.headsUp { 1090 // Stop accepting more streams now. 1091 t.state = draining 1092 activeStreams := len(t.activeStreams) 1093 t.mu.Unlock() 1094 t.framer.writeGoAway(true, sid, i.code, i.debugData) 1095 if i.closeConn || activeStreams == 0 { 1096 // Abruptly close the connection following the GoAway. 1097 t.Close() 1098 } 1099 t.writableChan <- 0 1100 continue 1101 } 1102 t.mu.Unlock() 1103 // For a graceful close, send out a GoAway with stream ID of MaxUInt32, 1104 // Follow that with a ping and wait for the ack to come back or a timer 1105 // to expire. During this time accept new streams since they might have 1106 // originated before the GoAway reaches the client. 1107 // After getting the ack or timer expiration send out another GoAway this 1108 // time with an ID of the max stream server intends to process. 1109 t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{}) 1110 t.framer.writePing(true, false, goAwayPing.data) 1111 go func() { 1112 timer := time.NewTimer(time.Minute) 1113 defer timer.Stop() 1114 select { 1115 case <-t.drainChan: 1116 case <-timer.C: 1117 case <-t.shutdownChan: 1118 return 1119 } 1120 t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData}) 1121 }() 1122 case *flushIO: 1123 t.framer.flushWrite() 1124 case *ping: 1125 if !i.ack { 1126 t.bdpEst.timesnap(i.data) 1127 } 1128 t.framer.writePing(true, i.ack, i.data) 1129 default: 1130 errorf("transport: http2Server.controller got unexpected item type %v\n", i) 1131 } 1132 t.writableChan <- 0 1133 continue 1134 case <-t.shutdownChan: 1135 return 1136 } 1137 case <-t.shutdownChan: 1138 return 1139 } 1140 } 1141} 1142 1143// Close starts shutting down the http2Server transport. 1144// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This 1145// could cause some resource issue. Revisit this later. 1146func (t *http2Server) Close() (err error) { 1147 t.mu.Lock() 1148 if t.state == closing { 1149 t.mu.Unlock() 1150 return errors.New("transport: Close() was already called") 1151 } 1152 t.state = closing 1153 streams := t.activeStreams 1154 t.activeStreams = nil 1155 t.mu.Unlock() 1156 close(t.shutdownChan) 1157 err = t.conn.Close() 1158 // Cancel all active streams. 1159 for _, s := range streams { 1160 s.cancel() 1161 } 1162 if t.stats != nil { 1163 connEnd := &stats.ConnEnd{} 1164 t.stats.HandleConn(t.ctx, connEnd) 1165 } 1166 return 1167} 1168 1169// closeStream clears the footprint of a stream when the stream is not needed 1170// any more. 1171func (t *http2Server) closeStream(s *Stream) { 1172 t.mu.Lock() 1173 delete(t.activeStreams, s.id) 1174 if len(t.activeStreams) == 0 { 1175 t.idle = time.Now() 1176 } 1177 if t.state == draining && len(t.activeStreams) == 0 { 1178 defer t.Close() 1179 } 1180 t.mu.Unlock() 1181 // In case stream sending and receiving are invoked in separate 1182 // goroutines (e.g., bi-directional streaming), cancel needs to be 1183 // called to interrupt the potential blocking on other goroutines. 1184 s.cancel() 1185 s.mu.Lock() 1186 if s.state == streamDone { 1187 s.mu.Unlock() 1188 return 1189 } 1190 s.state = streamDone 1191 s.mu.Unlock() 1192} 1193 1194func (t *http2Server) RemoteAddr() net.Addr { 1195 return t.remoteAddr 1196} 1197 1198func (t *http2Server) Drain() { 1199 t.drain(http2.ErrCodeNo, []byte{}) 1200} 1201 1202func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { 1203 t.mu.Lock() 1204 defer t.mu.Unlock() 1205 if t.drainChan != nil { 1206 return 1207 } 1208 t.drainChan = make(chan struct{}) 1209 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) 1210} 1211 1212var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) 1213 1214func getJitter(v time.Duration) time.Duration { 1215 if v == infinity { 1216 return 0 1217 } 1218 // Generate a jitter between +/- 10% of the value. 1219 r := int64(v / 10) 1220 j := rgen.Int63n(2*r) - r 1221 return time.Duration(j) 1222} 1223