1/* 2 * 3 * Copyright 2014, Google Inc. 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are 8 * met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above 13 * copyright notice, this list of conditions and the following disclaimer 14 * in the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Google Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 * 32 */ 33 34package transport 35 36import ( 37 "bytes" 38 "errors" 39 "io" 40 "math" 41 "net" 42 "strconv" 43 "sync" 44 45 "golang.org/x/net/context" 46 "golang.org/x/net/http2" 47 "golang.org/x/net/http2/hpack" 48 "google.golang.org/grpc/codes" 49 "google.golang.org/grpc/credentials" 50 "google.golang.org/grpc/grpclog" 51 "google.golang.org/grpc/metadata" 52 "google.golang.org/grpc/peer" 53) 54 55// ErrIllegalHeaderWrite indicates that setting header is illegal because of 56// the stream's state. 57var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") 58 59// http2Server implements the ServerTransport interface with HTTP2. 60type http2Server struct { 61 conn net.Conn 62 maxStreamID uint32 // max stream ID ever seen 63 authInfo credentials.AuthInfo // auth info about the connection 64 // writableChan synchronizes write access to the transport. 65 // A writer acquires the write lock by receiving a value on writableChan 66 // and releases it by sending on writableChan. 67 writableChan chan int 68 // shutdownChan is closed when Close is called. 69 // Blocking operations should select on shutdownChan to avoid 70 // blocking forever after Close. 71 shutdownChan chan struct{} 72 framer *framer 73 hBuf *bytes.Buffer // the buffer for HPACK encoding 74 hEnc *hpack.Encoder // HPACK encoder 75 76 // The max number of concurrent streams. 77 maxStreams uint32 78 // controlBuf delivers all the control related tasks (e.g., window 79 // updates, reset streams, and various settings) to the controller. 80 controlBuf *recvBuffer 81 fc *inFlow 82 // sendQuotaPool provides flow control to outbound message. 83 sendQuotaPool *quotaPool 84 85 mu sync.Mutex // guard the following 86 state transportState 87 activeStreams map[uint32]*Stream 88 // the per-stream outbound flow control window size set by the peer. 89 streamSendQuota uint32 90} 91 92// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is 93// returned if something goes wrong. 94func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) { 95 framer := newFramer(conn) 96 // Send initial settings as connection preface to client. 97 var settings []http2.Setting 98 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is 99 // permitted in the HTTP2 spec. 100 if maxStreams == 0 { 101 maxStreams = math.MaxUint32 102 } else { 103 settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams}) 104 } 105 if initialWindowSize != defaultWindowSize { 106 settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) 107 } 108 if err := framer.writeSettings(true, settings...); err != nil { 109 return nil, ConnectionErrorf("transport: %v", err) 110 } 111 // Adjust the connection flow control window if needed. 112 if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { 113 if err := framer.writeWindowUpdate(true, 0, delta); err != nil { 114 return nil, ConnectionErrorf("transport: %v", err) 115 } 116 } 117 var buf bytes.Buffer 118 t := &http2Server{ 119 conn: conn, 120 authInfo: authInfo, 121 framer: framer, 122 hBuf: &buf, 123 hEnc: hpack.NewEncoder(&buf), 124 maxStreams: maxStreams, 125 controlBuf: newRecvBuffer(), 126 fc: &inFlow{limit: initialConnWindowSize}, 127 sendQuotaPool: newQuotaPool(defaultWindowSize), 128 state: reachable, 129 writableChan: make(chan int, 1), 130 shutdownChan: make(chan struct{}), 131 activeStreams: make(map[uint32]*Stream), 132 streamSendQuota: defaultWindowSize, 133 } 134 go t.controller() 135 t.writableChan <- 0 136 return t, nil 137} 138 139// operateHeader takes action on the decoded headers. 140func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) { 141 buf := newRecvBuffer() 142 fc := &inFlow{ 143 limit: initialWindowSize, 144 conn: t.fc, 145 } 146 s := &Stream{ 147 id: frame.Header().StreamID, 148 st: t, 149 buf: buf, 150 fc: fc, 151 } 152 153 var state decodeState 154 for _, hf := range frame.Fields { 155 state.processHeaderField(hf) 156 } 157 if err := state.err; err != nil { 158 if se, ok := err.(StreamError); ok { 159 t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]}) 160 } 161 return 162 } 163 164 if frame.StreamEnded() { 165 // s is just created by the caller. No lock needed. 166 s.state = streamReadDone 167 } 168 s.recvCompress = state.encoding 169 if state.timeoutSet { 170 s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout) 171 } else { 172 s.ctx, s.cancel = context.WithCancel(context.TODO()) 173 } 174 pr := &peer.Peer{ 175 Addr: t.conn.RemoteAddr(), 176 } 177 // Attach Auth info if there is any. 178 if t.authInfo != nil { 179 pr.AuthInfo = t.authInfo 180 } 181 s.ctx = peer.NewContext(s.ctx, pr) 182 // Cache the current stream to the context so that the server application 183 // can find out. Required when the server wants to send some metadata 184 // back to the client (unary call only). 185 s.ctx = newContextWithStream(s.ctx, s) 186 // Attach the received metadata to the context. 187 if len(state.mdata) > 0 { 188 s.ctx = metadata.NewContext(s.ctx, state.mdata) 189 } 190 191 s.dec = &recvBufferReader{ 192 ctx: s.ctx, 193 recv: s.buf, 194 } 195 s.recvCompress = state.encoding 196 s.method = state.method 197 t.mu.Lock() 198 if t.state != reachable { 199 t.mu.Unlock() 200 return 201 } 202 if uint32(len(t.activeStreams)) >= t.maxStreams { 203 t.mu.Unlock() 204 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) 205 return 206 } 207 s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) 208 t.activeStreams[s.id] = s 209 t.mu.Unlock() 210 s.windowHandler = func(n int) { 211 t.updateWindow(s, uint32(n)) 212 } 213 handle(s) 214} 215 216// HandleStreams receives incoming streams using the given handler. This is 217// typically run in a separate goroutine. 218func (t *http2Server) HandleStreams(handle func(*Stream)) { 219 // Check the validity of client preface. 220 preface := make([]byte, len(clientPreface)) 221 if _, err := io.ReadFull(t.conn, preface); err != nil { 222 grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) 223 t.Close() 224 return 225 } 226 if !bytes.Equal(preface, clientPreface) { 227 grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) 228 t.Close() 229 return 230 } 231 232 frame, err := t.framer.readFrame() 233 if err != nil { 234 grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err) 235 t.Close() 236 return 237 } 238 sf, ok := frame.(*http2.SettingsFrame) 239 if !ok { 240 grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) 241 t.Close() 242 return 243 } 244 t.handleSettings(sf) 245 246 for { 247 frame, err := t.framer.readFrame() 248 if err != nil { 249 t.Close() 250 return 251 } 252 switch frame := frame.(type) { 253 case *http2.MetaHeadersFrame: 254 id := frame.Header().StreamID 255 if id%2 != 1 || id <= t.maxStreamID { 256 // illegal gRPC stream id. 257 grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id) 258 t.Close() 259 break 260 } 261 t.maxStreamID = id 262 t.operateHeaders(frame, handle) 263 case *http2.DataFrame: 264 t.handleData(frame) 265 case *http2.RSTStreamFrame: 266 t.handleRSTStream(frame) 267 case *http2.SettingsFrame: 268 t.handleSettings(frame) 269 case *http2.PingFrame: 270 t.handlePing(frame) 271 case *http2.WindowUpdateFrame: 272 t.handleWindowUpdate(frame) 273 case *http2.GoAwayFrame: 274 break 275 default: 276 grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) 277 } 278 } 279} 280 281func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { 282 t.mu.Lock() 283 defer t.mu.Unlock() 284 if t.activeStreams == nil { 285 // The transport is closing. 286 return nil, false 287 } 288 s, ok := t.activeStreams[f.Header().StreamID] 289 if !ok { 290 // The stream is already done. 291 return nil, false 292 } 293 return s, true 294} 295 296// updateWindow adjusts the inbound quota for the stream and the transport. 297// Window updates will deliver to the controller for sending when 298// the cumulative quota exceeds the corresponding threshold. 299func (t *http2Server) updateWindow(s *Stream, n uint32) { 300 swu, cwu := s.fc.onRead(n) 301 if swu > 0 { 302 t.controlBuf.put(&windowUpdate{s.id, swu}) 303 } 304 if cwu > 0 { 305 t.controlBuf.put(&windowUpdate{0, cwu}) 306 } 307} 308 309func (t *http2Server) handleData(f *http2.DataFrame) { 310 // Select the right stream to dispatch. 311 s, ok := t.getStream(f) 312 if !ok { 313 return 314 } 315 size := len(f.Data()) 316 if size > 0 { 317 if err := s.fc.onData(uint32(size)); err != nil { 318 if _, ok := err.(ConnectionError); ok { 319 grpclog.Printf("transport: http2Server %v", err) 320 t.Close() 321 return 322 } 323 t.closeStream(s) 324 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) 325 return 326 } 327 // TODO(bradfitz, zhaoq): A copy is required here because there is no 328 // guarantee f.Data() is consumed before the arrival of next frame. 329 // Can this copy be eliminated? 330 data := make([]byte, size) 331 copy(data, f.Data()) 332 s.write(recvMsg{data: data}) 333 } 334 if f.Header().Flags.Has(http2.FlagDataEndStream) { 335 // Received the end of stream from the client. 336 s.mu.Lock() 337 if s.state != streamDone { 338 if s.state == streamWriteDone { 339 s.state = streamDone 340 } else { 341 s.state = streamReadDone 342 } 343 } 344 s.mu.Unlock() 345 s.write(recvMsg{err: io.EOF}) 346 } 347} 348 349func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { 350 s, ok := t.getStream(f) 351 if !ok { 352 return 353 } 354 t.closeStream(s) 355} 356 357func (t *http2Server) handleSettings(f *http2.SettingsFrame) { 358 if f.IsAck() { 359 return 360 } 361 var ss []http2.Setting 362 f.ForeachSetting(func(s http2.Setting) error { 363 ss = append(ss, s) 364 return nil 365 }) 366 // The settings will be applied once the ack is sent. 367 t.controlBuf.put(&settings{ack: true, ss: ss}) 368} 369 370func (t *http2Server) handlePing(f *http2.PingFrame) { 371 pingAck := &ping{ack: true} 372 copy(pingAck.data[:], f.Data[:]) 373 t.controlBuf.put(pingAck) 374} 375 376func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { 377 id := f.Header().StreamID 378 incr := f.Increment 379 if id == 0 { 380 t.sendQuotaPool.add(int(incr)) 381 return 382 } 383 if s, ok := t.getStream(f); ok { 384 s.sendQuotaPool.add(int(incr)) 385 } 386} 387 388func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error { 389 first := true 390 endHeaders := false 391 var err error 392 // Sends the headers in a single batch. 393 for !endHeaders { 394 size := t.hBuf.Len() 395 if size > http2MaxFrameLen { 396 size = http2MaxFrameLen 397 } else { 398 endHeaders = true 399 } 400 if first { 401 p := http2.HeadersFrameParam{ 402 StreamID: s.id, 403 BlockFragment: b.Next(size), 404 EndStream: endStream, 405 EndHeaders: endHeaders, 406 } 407 err = t.framer.writeHeaders(endHeaders, p) 408 first = false 409 } else { 410 err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size)) 411 } 412 if err != nil { 413 t.Close() 414 return ConnectionErrorf("transport: %v", err) 415 } 416 } 417 return nil 418} 419 420// WriteHeader sends the header metedata md back to the client. 421func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { 422 s.mu.Lock() 423 if s.headerOk || s.state == streamDone { 424 s.mu.Unlock() 425 return ErrIllegalHeaderWrite 426 } 427 s.headerOk = true 428 s.mu.Unlock() 429 if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { 430 return err 431 } 432 t.hBuf.Reset() 433 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) 434 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) 435 if s.sendCompress != "" { 436 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 437 } 438 for k, v := range md { 439 for _, entry := range v { 440 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) 441 } 442 } 443 if err := t.writeHeaders(s, t.hBuf, false); err != nil { 444 return err 445 } 446 t.writableChan <- 0 447 return nil 448} 449 450// WriteStatus sends stream status to the client and terminates the stream. 451// There is no further I/O operations being able to perform on this stream. 452// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early 453// OK is adopted. 454func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error { 455 var headersSent bool 456 s.mu.Lock() 457 if s.state == streamDone { 458 s.mu.Unlock() 459 return nil 460 } 461 if s.headerOk { 462 headersSent = true 463 } 464 s.mu.Unlock() 465 if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { 466 return err 467 } 468 t.hBuf.Reset() 469 if !headersSent { 470 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) 471 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) 472 } 473 t.hEnc.WriteField( 474 hpack.HeaderField{ 475 Name: "grpc-status", 476 Value: strconv.Itoa(int(statusCode)), 477 }) 478 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc}) 479 // Attach the trailer metadata. 480 for k, v := range s.trailer { 481 for _, entry := range v { 482 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) 483 } 484 } 485 if err := t.writeHeaders(s, t.hBuf, true); err != nil { 486 t.Close() 487 return err 488 } 489 t.closeStream(s) 490 t.writableChan <- 0 491 return nil 492} 493 494// Write converts the data into HTTP2 data frame and sends it out. Non-nil error 495// is returns if it fails (e.g., framing error, transport error). 496func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { 497 // TODO(zhaoq): Support multi-writers for a single stream. 498 var writeHeaderFrame bool 499 s.mu.Lock() 500 if !s.headerOk { 501 writeHeaderFrame = true 502 s.headerOk = true 503 } 504 s.mu.Unlock() 505 if writeHeaderFrame { 506 if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { 507 return err 508 } 509 t.hBuf.Reset() 510 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) 511 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) 512 if s.sendCompress != "" { 513 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) 514 } 515 p := http2.HeadersFrameParam{ 516 StreamID: s.id, 517 BlockFragment: t.hBuf.Bytes(), 518 EndHeaders: true, 519 } 520 if err := t.framer.writeHeaders(false, p); err != nil { 521 t.Close() 522 return ConnectionErrorf("transport: %v", err) 523 } 524 t.writableChan <- 0 525 } 526 r := bytes.NewBuffer(data) 527 for { 528 if r.Len() == 0 { 529 return nil 530 } 531 size := http2MaxFrameLen 532 s.sendQuotaPool.add(0) 533 // Wait until the stream has some quota to send the data. 534 sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire()) 535 if err != nil { 536 return err 537 } 538 t.sendQuotaPool.add(0) 539 // Wait until the transport has some quota to send the data. 540 tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire()) 541 if err != nil { 542 if _, ok := err.(StreamError); ok { 543 t.sendQuotaPool.cancel() 544 } 545 return err 546 } 547 if sq < size { 548 size = sq 549 } 550 if tq < size { 551 size = tq 552 } 553 p := r.Next(size) 554 ps := len(p) 555 if ps < sq { 556 // Overbooked stream quota. Return it back. 557 s.sendQuotaPool.add(sq - ps) 558 } 559 if ps < tq { 560 // Overbooked transport quota. Return it back. 561 t.sendQuotaPool.add(tq - ps) 562 } 563 t.framer.adjustNumWriters(1) 564 // Got some quota. Try to acquire writing privilege on the 565 // transport. 566 if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { 567 if t.framer.adjustNumWriters(-1) == 0 { 568 // This writer is the last one in this batch and has the 569 // responsibility to flush the buffered frames. It queues 570 // a flush request to controlBuf instead of flushing directly 571 // in order to avoid the race with other writing or flushing. 572 t.controlBuf.put(&flushIO{}) 573 } 574 return err 575 } 576 var forceFlush bool 577 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { 578 forceFlush = true 579 } 580 if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { 581 t.Close() 582 return ConnectionErrorf("transport: %v", err) 583 } 584 if t.framer.adjustNumWriters(-1) == 0 { 585 t.framer.flushWrite() 586 } 587 t.writableChan <- 0 588 } 589 590} 591 592func (t *http2Server) applySettings(ss []http2.Setting) { 593 for _, s := range ss { 594 if s.ID == http2.SettingInitialWindowSize { 595 t.mu.Lock() 596 defer t.mu.Unlock() 597 for _, stream := range t.activeStreams { 598 stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota)) 599 } 600 t.streamSendQuota = s.Val 601 } 602 603 } 604} 605 606// controller running in a separate goroutine takes charge of sending control 607// frames (e.g., window update, reset stream, setting, etc.) to the server. 608func (t *http2Server) controller() { 609 for { 610 select { 611 case i := <-t.controlBuf.get(): 612 t.controlBuf.load() 613 select { 614 case <-t.writableChan: 615 switch i := i.(type) { 616 case *windowUpdate: 617 t.framer.writeWindowUpdate(true, i.streamID, i.increment) 618 case *settings: 619 if i.ack { 620 t.framer.writeSettingsAck(true) 621 t.applySettings(i.ss) 622 } else { 623 t.framer.writeSettings(true, i.ss...) 624 } 625 case *resetStream: 626 t.framer.writeRSTStream(true, i.streamID, i.code) 627 case *flushIO: 628 t.framer.flushWrite() 629 case *ping: 630 t.framer.writePing(true, i.ack, i.data) 631 default: 632 grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i) 633 } 634 t.writableChan <- 0 635 continue 636 case <-t.shutdownChan: 637 return 638 } 639 case <-t.shutdownChan: 640 return 641 } 642 } 643} 644 645// Close starts shutting down the http2Server transport. 646// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This 647// could cause some resource issue. Revisit this later. 648func (t *http2Server) Close() (err error) { 649 t.mu.Lock() 650 if t.state == closing { 651 t.mu.Unlock() 652 return errors.New("transport: Close() was already called") 653 } 654 t.state = closing 655 streams := t.activeStreams 656 t.activeStreams = nil 657 t.mu.Unlock() 658 close(t.shutdownChan) 659 err = t.conn.Close() 660 // Cancel all active streams. 661 for _, s := range streams { 662 s.cancel() 663 } 664 return 665} 666 667// closeStream clears the footprint of a stream when the stream is not needed 668// any more. 669func (t *http2Server) closeStream(s *Stream) { 670 t.mu.Lock() 671 delete(t.activeStreams, s.id) 672 t.mu.Unlock() 673 if q := s.fc.restoreConn(); q > 0 { 674 t.controlBuf.put(&windowUpdate{0, q}) 675 } 676 s.mu.Lock() 677 if s.state == streamDone { 678 s.mu.Unlock() 679 return 680 } 681 s.state = streamDone 682 s.mu.Unlock() 683 // In case stream sending and receiving are invoked in separate 684 // goroutines (e.g., bi-directional streaming), cancel needs to be 685 // called to interrupt the potential blocking on other goroutines. 686 s.cancel() 687} 688 689func (t *http2Server) RemoteAddr() net.Addr { 690 return t.conn.RemoteAddr() 691} 692