1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal/binarylog" 44 "google.golang.org/grpc/internal/channelz" 45 "google.golang.org/grpc/internal/grpcsync" 46 "google.golang.org/grpc/internal/transport" 47 "google.golang.org/grpc/keepalive" 48 "google.golang.org/grpc/metadata" 49 "google.golang.org/grpc/peer" 50 "google.golang.org/grpc/stats" 51 "google.golang.org/grpc/status" 52 "google.golang.org/grpc/tap" 53) 54 55const ( 56 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 57 defaultServerMaxSendMessageSize = math.MaxInt32 58) 59 60var statusOK = status.New(codes.OK, "") 61 62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 63 64// MethodDesc represents an RPC service's method specification. 65type MethodDesc struct { 66 MethodName string 67 Handler methodHandler 68} 69 70// ServiceDesc represents an RPC service's specification. 71type ServiceDesc struct { 72 ServiceName string 73 // The pointer to the service interface. Used to check whether the user 74 // provided implementation satisfies the interface requirements. 75 HandlerType interface{} 76 Methods []MethodDesc 77 Streams []StreamDesc 78 Metadata interface{} 79} 80 81// service consists of the information of the server serving this service and 82// the methods in this service. 83type service struct { 84 server interface{} // the server for service methods 85 md map[string]*MethodDesc 86 sd map[string]*StreamDesc 87 mdata interface{} 88} 89 90// Server is a gRPC server to serve RPC requests. 91type Server struct { 92 opts serverOptions 93 94 mu sync.Mutex // guards following 95 lis map[net.Listener]bool 96 conns map[transport.ServerTransport]bool 97 serve bool 98 drain bool 99 cv *sync.Cond // signaled when connections close for GracefulStop 100 m map[string]*service // service name -> service info 101 events trace.EventLog 102 103 quit *grpcsync.Event 104 done *grpcsync.Event 105 channelzRemoveOnce sync.Once 106 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 107 108 channelzID int64 // channelz unique identification number 109 czData *channelzData 110} 111 112type serverOptions struct { 113 creds credentials.TransportCredentials 114 codec baseCodec 115 cp Compressor 116 dc Decompressor 117 unaryInt UnaryServerInterceptor 118 streamInt StreamServerInterceptor 119 chainUnaryInts []UnaryServerInterceptor 120 chainStreamInts []StreamServerInterceptor 121 inTapHandle tap.ServerInHandle 122 statsHandler stats.Handler 123 maxConcurrentStreams uint32 124 maxReceiveMessageSize int 125 maxSendMessageSize int 126 unknownStreamDesc *StreamDesc 127 keepaliveParams keepalive.ServerParameters 128 keepalivePolicy keepalive.EnforcementPolicy 129 initialWindowSize int32 130 initialConnWindowSize int32 131 writeBufferSize int 132 readBufferSize int 133 connectionTimeout time.Duration 134 maxHeaderListSize *uint32 135 headerTableSize *uint32 136} 137 138var defaultServerOptions = serverOptions{ 139 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 140 maxSendMessageSize: defaultServerMaxSendMessageSize, 141 connectionTimeout: 120 * time.Second, 142 writeBufferSize: defaultWriteBufSize, 143 readBufferSize: defaultReadBufSize, 144} 145 146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 147type ServerOption interface { 148 apply(*serverOptions) 149} 150 151// EmptyServerOption does not alter the server configuration. It can be embedded 152// in another structure to build custom server options. 153// 154// This API is EXPERIMENTAL. 155type EmptyServerOption struct{} 156 157func (EmptyServerOption) apply(*serverOptions) {} 158 159// funcServerOption wraps a function that modifies serverOptions into an 160// implementation of the ServerOption interface. 161type funcServerOption struct { 162 f func(*serverOptions) 163} 164 165func (fdo *funcServerOption) apply(do *serverOptions) { 166 fdo.f(do) 167} 168 169func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 170 return &funcServerOption{ 171 f: f, 172 } 173} 174 175// WriteBufferSize determines how much data can be batched before doing a write on the wire. 176// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 177// The default value for this buffer is 32KB. 178// Zero will disable the write buffer such that each write will be on underlying connection. 179// Note: A Send call may not directly translate to a write. 180func WriteBufferSize(s int) ServerOption { 181 return newFuncServerOption(func(o *serverOptions) { 182 o.writeBufferSize = s 183 }) 184} 185 186// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 187// for one read syscall. 188// The default value for this buffer is 32KB. 189// Zero will disable read buffer for a connection so data framer can access the underlying 190// conn directly. 191func ReadBufferSize(s int) ServerOption { 192 return newFuncServerOption(func(o *serverOptions) { 193 o.readBufferSize = s 194 }) 195} 196 197// InitialWindowSize returns a ServerOption that sets window size for stream. 198// The lower bound for window size is 64K and any value smaller than that will be ignored. 199func InitialWindowSize(s int32) ServerOption { 200 return newFuncServerOption(func(o *serverOptions) { 201 o.initialWindowSize = s 202 }) 203} 204 205// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 206// The lower bound for window size is 64K and any value smaller than that will be ignored. 207func InitialConnWindowSize(s int32) ServerOption { 208 return newFuncServerOption(func(o *serverOptions) { 209 o.initialConnWindowSize = s 210 }) 211} 212 213// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 214func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 215 if kp.Time > 0 && kp.Time < time.Second { 216 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") 217 kp.Time = time.Second 218 } 219 220 return newFuncServerOption(func(o *serverOptions) { 221 o.keepaliveParams = kp 222 }) 223} 224 225// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 226func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 227 return newFuncServerOption(func(o *serverOptions) { 228 o.keepalivePolicy = kep 229 }) 230} 231 232// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 233// 234// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 235func CustomCodec(codec Codec) ServerOption { 236 return newFuncServerOption(func(o *serverOptions) { 237 o.codec = codec 238 }) 239} 240 241// RPCCompressor returns a ServerOption that sets a compressor for outbound 242// messages. For backward compatibility, all outbound messages will be sent 243// using this compressor, regardless of incoming message compression. By 244// default, server messages will be sent using the same compressor with which 245// request messages were sent. 246// 247// Deprecated: use encoding.RegisterCompressor instead. 248func RPCCompressor(cp Compressor) ServerOption { 249 return newFuncServerOption(func(o *serverOptions) { 250 o.cp = cp 251 }) 252} 253 254// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 255// messages. It has higher priority than decompressors registered via 256// encoding.RegisterCompressor. 257// 258// Deprecated: use encoding.RegisterCompressor instead. 259func RPCDecompressor(dc Decompressor) ServerOption { 260 return newFuncServerOption(func(o *serverOptions) { 261 o.dc = dc 262 }) 263} 264 265// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 266// If this is not set, gRPC uses the default limit. 267// 268// Deprecated: use MaxRecvMsgSize instead. 269func MaxMsgSize(m int) ServerOption { 270 return MaxRecvMsgSize(m) 271} 272 273// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 274// If this is not set, gRPC uses the default 4MB. 275func MaxRecvMsgSize(m int) ServerOption { 276 return newFuncServerOption(func(o *serverOptions) { 277 o.maxReceiveMessageSize = m 278 }) 279} 280 281// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 282// If this is not set, gRPC uses the default `math.MaxInt32`. 283func MaxSendMsgSize(m int) ServerOption { 284 return newFuncServerOption(func(o *serverOptions) { 285 o.maxSendMessageSize = m 286 }) 287} 288 289// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 290// of concurrent streams to each ServerTransport. 291func MaxConcurrentStreams(n uint32) ServerOption { 292 return newFuncServerOption(func(o *serverOptions) { 293 o.maxConcurrentStreams = n 294 }) 295} 296 297// Creds returns a ServerOption that sets credentials for server connections. 298func Creds(c credentials.TransportCredentials) ServerOption { 299 return newFuncServerOption(func(o *serverOptions) { 300 o.creds = c 301 }) 302} 303 304// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 305// server. Only one unary interceptor can be installed. The construction of multiple 306// interceptors (e.g., chaining) can be implemented at the caller. 307func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 308 return newFuncServerOption(func(o *serverOptions) { 309 if o.unaryInt != nil { 310 panic("The unary server interceptor was already set and may not be reset.") 311 } 312 o.unaryInt = i 313 }) 314} 315 316// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 317// for unary RPCs. The first interceptor will be the outer most, 318// while the last interceptor will be the inner most wrapper around the real call. 319// All unary interceptors added by this method will be chained. 320func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 321 return newFuncServerOption(func(o *serverOptions) { 322 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 323 }) 324} 325 326// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 327// server. Only one stream interceptor can be installed. 328func StreamInterceptor(i StreamServerInterceptor) ServerOption { 329 return newFuncServerOption(func(o *serverOptions) { 330 if o.streamInt != nil { 331 panic("The stream server interceptor was already set and may not be reset.") 332 } 333 o.streamInt = i 334 }) 335} 336 337// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 338// for stream RPCs. The first interceptor will be the outer most, 339// while the last interceptor will be the inner most wrapper around the real call. 340// All stream interceptors added by this method will be chained. 341func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 342 return newFuncServerOption(func(o *serverOptions) { 343 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 344 }) 345} 346 347// InTapHandle returns a ServerOption that sets the tap handle for all the server 348// transport to be created. Only one can be installed. 349func InTapHandle(h tap.ServerInHandle) ServerOption { 350 return newFuncServerOption(func(o *serverOptions) { 351 if o.inTapHandle != nil { 352 panic("The tap handle was already set and may not be reset.") 353 } 354 o.inTapHandle = h 355 }) 356} 357 358// StatsHandler returns a ServerOption that sets the stats handler for the server. 359func StatsHandler(h stats.Handler) ServerOption { 360 return newFuncServerOption(func(o *serverOptions) { 361 o.statsHandler = h 362 }) 363} 364 365// UnknownServiceHandler returns a ServerOption that allows for adding a custom 366// unknown service handler. The provided method is a bidi-streaming RPC service 367// handler that will be invoked instead of returning the "unimplemented" gRPC 368// error whenever a request is received for an unregistered service or method. 369// The handling function and stream interceptor (if set) have full access to 370// the ServerStream, including its Context. 371func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 372 return newFuncServerOption(func(o *serverOptions) { 373 o.unknownStreamDesc = &StreamDesc{ 374 StreamName: "unknown_service_handler", 375 Handler: streamHandler, 376 // We need to assume that the users of the streamHandler will want to use both. 377 ClientStreams: true, 378 ServerStreams: true, 379 } 380 }) 381} 382 383// ConnectionTimeout returns a ServerOption that sets the timeout for 384// connection establishment (up to and including HTTP/2 handshaking) for all 385// new connections. If this is not set, the default is 120 seconds. A zero or 386// negative value will result in an immediate timeout. 387// 388// This API is EXPERIMENTAL. 389func ConnectionTimeout(d time.Duration) ServerOption { 390 return newFuncServerOption(func(o *serverOptions) { 391 o.connectionTimeout = d 392 }) 393} 394 395// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 396// of header list that the server is prepared to accept. 397func MaxHeaderListSize(s uint32) ServerOption { 398 return newFuncServerOption(func(o *serverOptions) { 399 o.maxHeaderListSize = &s 400 }) 401} 402 403// HeaderTableSize returns a ServerOption that sets the size of dynamic 404// header table for stream. 405// 406// This API is EXPERIMENTAL. 407func HeaderTableSize(s uint32) ServerOption { 408 return newFuncServerOption(func(o *serverOptions) { 409 o.headerTableSize = &s 410 }) 411} 412 413// NewServer creates a gRPC server which has no service registered and has not 414// started to accept requests yet. 415func NewServer(opt ...ServerOption) *Server { 416 opts := defaultServerOptions 417 for _, o := range opt { 418 o.apply(&opts) 419 } 420 s := &Server{ 421 lis: make(map[net.Listener]bool), 422 opts: opts, 423 conns: make(map[transport.ServerTransport]bool), 424 m: make(map[string]*service), 425 quit: grpcsync.NewEvent(), 426 done: grpcsync.NewEvent(), 427 czData: new(channelzData), 428 } 429 chainUnaryServerInterceptors(s) 430 chainStreamServerInterceptors(s) 431 s.cv = sync.NewCond(&s.mu) 432 if EnableTracing { 433 _, file, line, _ := runtime.Caller(1) 434 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 435 } 436 437 if channelz.IsOn() { 438 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 439 } 440 return s 441} 442 443// printf records an event in s's event log, unless s has been stopped. 444// REQUIRES s.mu is held. 445func (s *Server) printf(format string, a ...interface{}) { 446 if s.events != nil { 447 s.events.Printf(format, a...) 448 } 449} 450 451// errorf records an error in s's event log, unless s has been stopped. 452// REQUIRES s.mu is held. 453func (s *Server) errorf(format string, a ...interface{}) { 454 if s.events != nil { 455 s.events.Errorf(format, a...) 456 } 457} 458 459// RegisterService registers a service and its implementation to the gRPC 460// server. It is called from the IDL generated code. This must be called before 461// invoking Serve. 462func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 463 ht := reflect.TypeOf(sd.HandlerType).Elem() 464 st := reflect.TypeOf(ss) 465 if !st.Implements(ht) { 466 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 467 } 468 s.register(sd, ss) 469} 470 471func (s *Server) register(sd *ServiceDesc, ss interface{}) { 472 s.mu.Lock() 473 defer s.mu.Unlock() 474 s.printf("RegisterService(%q)", sd.ServiceName) 475 if s.serve { 476 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 477 } 478 if _, ok := s.m[sd.ServiceName]; ok { 479 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 480 } 481 srv := &service{ 482 server: ss, 483 md: make(map[string]*MethodDesc), 484 sd: make(map[string]*StreamDesc), 485 mdata: sd.Metadata, 486 } 487 for i := range sd.Methods { 488 d := &sd.Methods[i] 489 srv.md[d.MethodName] = d 490 } 491 for i := range sd.Streams { 492 d := &sd.Streams[i] 493 srv.sd[d.StreamName] = d 494 } 495 s.m[sd.ServiceName] = srv 496} 497 498// MethodInfo contains the information of an RPC including its method name and type. 499type MethodInfo struct { 500 // Name is the method name only, without the service name or package name. 501 Name string 502 // IsClientStream indicates whether the RPC is a client streaming RPC. 503 IsClientStream bool 504 // IsServerStream indicates whether the RPC is a server streaming RPC. 505 IsServerStream bool 506} 507 508// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 509type ServiceInfo struct { 510 Methods []MethodInfo 511 // Metadata is the metadata specified in ServiceDesc when registering service. 512 Metadata interface{} 513} 514 515// GetServiceInfo returns a map from service names to ServiceInfo. 516// Service names include the package names, in the form of <package>.<service>. 517func (s *Server) GetServiceInfo() map[string]ServiceInfo { 518 ret := make(map[string]ServiceInfo) 519 for n, srv := range s.m { 520 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 521 for m := range srv.md { 522 methods = append(methods, MethodInfo{ 523 Name: m, 524 IsClientStream: false, 525 IsServerStream: false, 526 }) 527 } 528 for m, d := range srv.sd { 529 methods = append(methods, MethodInfo{ 530 Name: m, 531 IsClientStream: d.ClientStreams, 532 IsServerStream: d.ServerStreams, 533 }) 534 } 535 536 ret[n] = ServiceInfo{ 537 Methods: methods, 538 Metadata: srv.mdata, 539 } 540 } 541 return ret 542} 543 544// ErrServerStopped indicates that the operation is now illegal because of 545// the server being stopped. 546var ErrServerStopped = errors.New("grpc: the server has been stopped") 547 548func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 549 if s.opts.creds == nil { 550 return rawConn, nil, nil 551 } 552 return s.opts.creds.ServerHandshake(rawConn) 553} 554 555type listenSocket struct { 556 net.Listener 557 channelzID int64 558} 559 560func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 561 return &channelz.SocketInternalMetric{ 562 SocketOptions: channelz.GetSocketOption(l.Listener), 563 LocalAddr: l.Listener.Addr(), 564 } 565} 566 567func (l *listenSocket) Close() error { 568 err := l.Listener.Close() 569 if channelz.IsOn() { 570 channelz.RemoveEntry(l.channelzID) 571 } 572 return err 573} 574 575// Serve accepts incoming connections on the listener lis, creating a new 576// ServerTransport and service goroutine for each. The service goroutines 577// read gRPC requests and then call the registered handlers to reply to them. 578// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 579// this method returns. 580// Serve will return a non-nil error unless Stop or GracefulStop is called. 581func (s *Server) Serve(lis net.Listener) error { 582 s.mu.Lock() 583 s.printf("serving") 584 s.serve = true 585 if s.lis == nil { 586 // Serve called after Stop or GracefulStop. 587 s.mu.Unlock() 588 lis.Close() 589 return ErrServerStopped 590 } 591 592 s.serveWG.Add(1) 593 defer func() { 594 s.serveWG.Done() 595 if s.quit.HasFired() { 596 // Stop or GracefulStop called; block until done and return nil. 597 <-s.done.Done() 598 } 599 }() 600 601 ls := &listenSocket{Listener: lis} 602 s.lis[ls] = true 603 604 if channelz.IsOn() { 605 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 606 } 607 s.mu.Unlock() 608 609 defer func() { 610 s.mu.Lock() 611 if s.lis != nil && s.lis[ls] { 612 ls.Close() 613 delete(s.lis, ls) 614 } 615 s.mu.Unlock() 616 }() 617 618 var tempDelay time.Duration // how long to sleep on accept failure 619 620 for { 621 rawConn, err := lis.Accept() 622 if err != nil { 623 if ne, ok := err.(interface { 624 Temporary() bool 625 }); ok && ne.Temporary() { 626 if tempDelay == 0 { 627 tempDelay = 5 * time.Millisecond 628 } else { 629 tempDelay *= 2 630 } 631 if max := 1 * time.Second; tempDelay > max { 632 tempDelay = max 633 } 634 s.mu.Lock() 635 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 636 s.mu.Unlock() 637 timer := time.NewTimer(tempDelay) 638 select { 639 case <-timer.C: 640 case <-s.quit.Done(): 641 timer.Stop() 642 return nil 643 } 644 continue 645 } 646 s.mu.Lock() 647 s.printf("done serving; Accept = %v", err) 648 s.mu.Unlock() 649 650 if s.quit.HasFired() { 651 return nil 652 } 653 return err 654 } 655 tempDelay = 0 656 // Start a new goroutine to deal with rawConn so we don't stall this Accept 657 // loop goroutine. 658 // 659 // Make sure we account for the goroutine so GracefulStop doesn't nil out 660 // s.conns before this conn can be added. 661 s.serveWG.Add(1) 662 go func() { 663 s.handleRawConn(rawConn) 664 s.serveWG.Done() 665 }() 666 } 667} 668 669// handleRawConn forks a goroutine to handle a just-accepted connection that 670// has not had any I/O performed on it yet. 671func (s *Server) handleRawConn(rawConn net.Conn) { 672 if s.quit.HasFired() { 673 rawConn.Close() 674 return 675 } 676 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 677 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 678 if err != nil { 679 // ErrConnDispatched means that the connection was dispatched away from 680 // gRPC; those connections should be left open. 681 if err != credentials.ErrConnDispatched { 682 s.mu.Lock() 683 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 684 s.mu.Unlock() 685 channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 686 rawConn.Close() 687 } 688 rawConn.SetDeadline(time.Time{}) 689 return 690 } 691 692 // Finish handshaking (HTTP2) 693 st := s.newHTTP2Transport(conn, authInfo) 694 if st == nil { 695 return 696 } 697 698 rawConn.SetDeadline(time.Time{}) 699 if !s.addConn(st) { 700 return 701 } 702 go func() { 703 s.serveStreams(st) 704 s.removeConn(st) 705 }() 706} 707 708// newHTTP2Transport sets up a http/2 transport (using the 709// gRPC http2 server transport in transport/http2_server.go). 710func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 711 config := &transport.ServerConfig{ 712 MaxStreams: s.opts.maxConcurrentStreams, 713 AuthInfo: authInfo, 714 InTapHandle: s.opts.inTapHandle, 715 StatsHandler: s.opts.statsHandler, 716 KeepaliveParams: s.opts.keepaliveParams, 717 KeepalivePolicy: s.opts.keepalivePolicy, 718 InitialWindowSize: s.opts.initialWindowSize, 719 InitialConnWindowSize: s.opts.initialConnWindowSize, 720 WriteBufferSize: s.opts.writeBufferSize, 721 ReadBufferSize: s.opts.readBufferSize, 722 ChannelzParentID: s.channelzID, 723 MaxHeaderListSize: s.opts.maxHeaderListSize, 724 HeaderTableSize: s.opts.headerTableSize, 725 } 726 st, err := transport.NewServerTransport("http2", c, config) 727 if err != nil { 728 s.mu.Lock() 729 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 730 s.mu.Unlock() 731 c.Close() 732 channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 733 return nil 734 } 735 736 return st 737} 738 739func (s *Server) serveStreams(st transport.ServerTransport) { 740 defer st.Close() 741 var wg sync.WaitGroup 742 st.HandleStreams(func(stream *transport.Stream) { 743 wg.Add(1) 744 go func() { 745 defer wg.Done() 746 s.handleStream(st, stream, s.traceInfo(st, stream)) 747 }() 748 }, func(ctx context.Context, method string) context.Context { 749 if !EnableTracing { 750 return ctx 751 } 752 tr := trace.New("grpc.Recv."+methodFamily(method), method) 753 return trace.NewContext(ctx, tr) 754 }) 755 wg.Wait() 756} 757 758var _ http.Handler = (*Server)(nil) 759 760// ServeHTTP implements the Go standard library's http.Handler 761// interface by responding to the gRPC request r, by looking up 762// the requested gRPC method in the gRPC server s. 763// 764// The provided HTTP request must have arrived on an HTTP/2 765// connection. When using the Go standard library's server, 766// practically this means that the Request must also have arrived 767// over TLS. 768// 769// To share one port (such as 443 for https) between gRPC and an 770// existing http.Handler, use a root http.Handler such as: 771// 772// if r.ProtoMajor == 2 && strings.HasPrefix( 773// r.Header.Get("Content-Type"), "application/grpc") { 774// grpcServer.ServeHTTP(w, r) 775// } else { 776// yourMux.ServeHTTP(w, r) 777// } 778// 779// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 780// separate from grpc-go's HTTP/2 server. Performance and features may vary 781// between the two paths. ServeHTTP does not support some gRPC features 782// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 783// and subject to change. 784func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 785 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 786 if err != nil { 787 http.Error(w, err.Error(), http.StatusInternalServerError) 788 return 789 } 790 if !s.addConn(st) { 791 return 792 } 793 defer s.removeConn(st) 794 s.serveStreams(st) 795} 796 797// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 798// If tracing is not enabled, it returns nil. 799func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 800 if !EnableTracing { 801 return nil 802 } 803 tr, ok := trace.FromContext(stream.Context()) 804 if !ok { 805 return nil 806 } 807 808 trInfo = &traceInfo{ 809 tr: tr, 810 firstLine: firstLine{ 811 client: false, 812 remoteAddr: st.RemoteAddr(), 813 }, 814 } 815 if dl, ok := stream.Context().Deadline(); ok { 816 trInfo.firstLine.deadline = time.Until(dl) 817 } 818 return trInfo 819} 820 821func (s *Server) addConn(st transport.ServerTransport) bool { 822 s.mu.Lock() 823 defer s.mu.Unlock() 824 if s.conns == nil { 825 st.Close() 826 return false 827 } 828 if s.drain { 829 // Transport added after we drained our existing conns: drain it 830 // immediately. 831 st.Drain() 832 } 833 s.conns[st] = true 834 return true 835} 836 837func (s *Server) removeConn(st transport.ServerTransport) { 838 s.mu.Lock() 839 defer s.mu.Unlock() 840 if s.conns != nil { 841 delete(s.conns, st) 842 s.cv.Broadcast() 843 } 844} 845 846func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 847 return &channelz.ServerInternalMetric{ 848 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 849 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 850 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 851 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 852 } 853} 854 855func (s *Server) incrCallsStarted() { 856 atomic.AddInt64(&s.czData.callsStarted, 1) 857 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 858} 859 860func (s *Server) incrCallsSucceeded() { 861 atomic.AddInt64(&s.czData.callsSucceeded, 1) 862} 863 864func (s *Server) incrCallsFailed() { 865 atomic.AddInt64(&s.czData.callsFailed, 1) 866} 867 868func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 869 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 870 if err != nil { 871 channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err) 872 return err 873 } 874 compData, err := compress(data, cp, comp) 875 if err != nil { 876 channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err) 877 return err 878 } 879 hdr, payload := msgHeader(data, compData) 880 // TODO(dfawley): should we be checking len(data) instead? 881 if len(payload) > s.opts.maxSendMessageSize { 882 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 883 } 884 err = t.Write(stream, hdr, payload, opts) 885 if err == nil && s.opts.statsHandler != nil { 886 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 887 } 888 return err 889} 890 891// chainUnaryServerInterceptors chains all unary server interceptors into one. 892func chainUnaryServerInterceptors(s *Server) { 893 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 894 // be executed before any other chained interceptors. 895 interceptors := s.opts.chainUnaryInts 896 if s.opts.unaryInt != nil { 897 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 898 } 899 900 var chainedInt UnaryServerInterceptor 901 if len(interceptors) == 0 { 902 chainedInt = nil 903 } else if len(interceptors) == 1 { 904 chainedInt = interceptors[0] 905 } else { 906 chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 907 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) 908 } 909 } 910 911 s.opts.unaryInt = chainedInt 912} 913 914// getChainUnaryHandler recursively generate the chained UnaryHandler 915func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { 916 if curr == len(interceptors)-1 { 917 return finalHandler 918 } 919 920 return func(ctx context.Context, req interface{}) (interface{}, error) { 921 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) 922 } 923} 924 925func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 926 sh := s.opts.statsHandler 927 if sh != nil || trInfo != nil || channelz.IsOn() { 928 if channelz.IsOn() { 929 s.incrCallsStarted() 930 } 931 var statsBegin *stats.Begin 932 if sh != nil { 933 beginTime := time.Now() 934 statsBegin = &stats.Begin{ 935 BeginTime: beginTime, 936 } 937 sh.HandleRPC(stream.Context(), statsBegin) 938 } 939 if trInfo != nil { 940 trInfo.tr.LazyLog(&trInfo.firstLine, false) 941 } 942 // The deferred error handling for tracing, stats handler and channelz are 943 // combined into one function to reduce stack usage -- a defer takes ~56-64 944 // bytes on the stack, so overflowing the stack will require a stack 945 // re-allocation, which is expensive. 946 // 947 // To maintain behavior similar to separate deferred statements, statements 948 // should be executed in the reverse order. That is, tracing first, stats 949 // handler second, and channelz last. Note that panics *within* defers will 950 // lead to different behavior, but that's an acceptable compromise; that 951 // would be undefined behavior territory anyway. 952 defer func() { 953 if trInfo != nil { 954 if err != nil && err != io.EOF { 955 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 956 trInfo.tr.SetError() 957 } 958 trInfo.tr.Finish() 959 } 960 961 if sh != nil { 962 end := &stats.End{ 963 BeginTime: statsBegin.BeginTime, 964 EndTime: time.Now(), 965 } 966 if err != nil && err != io.EOF { 967 end.Error = toRPCErr(err) 968 } 969 sh.HandleRPC(stream.Context(), end) 970 } 971 972 if channelz.IsOn() { 973 if err != nil && err != io.EOF { 974 s.incrCallsFailed() 975 } else { 976 s.incrCallsSucceeded() 977 } 978 } 979 }() 980 } 981 982 binlog := binarylog.GetMethodLogger(stream.Method()) 983 if binlog != nil { 984 ctx := stream.Context() 985 md, _ := metadata.FromIncomingContext(ctx) 986 logEntry := &binarylog.ClientHeader{ 987 Header: md, 988 MethodName: stream.Method(), 989 PeerAddr: nil, 990 } 991 if deadline, ok := ctx.Deadline(); ok { 992 logEntry.Timeout = time.Until(deadline) 993 if logEntry.Timeout < 0 { 994 logEntry.Timeout = 0 995 } 996 } 997 if a := md[":authority"]; len(a) > 0 { 998 logEntry.Authority = a[0] 999 } 1000 if peer, ok := peer.FromContext(ctx); ok { 1001 logEntry.PeerAddr = peer.Addr 1002 } 1003 binlog.Log(logEntry) 1004 } 1005 1006 // comp and cp are used for compression. decomp and dc are used for 1007 // decompression. If comp and decomp are both set, they are the same; 1008 // however they are kept separate to ensure that at most one of the 1009 // compressor/decompressor variable pairs are set for use later. 1010 var comp, decomp encoding.Compressor 1011 var cp Compressor 1012 var dc Decompressor 1013 1014 // If dc is set and matches the stream's compression, use it. Otherwise, try 1015 // to find a matching registered compressor for decomp. 1016 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1017 dc = s.opts.dc 1018 } else if rc != "" && rc != encoding.Identity { 1019 decomp = encoding.GetCompressor(rc) 1020 if decomp == nil { 1021 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1022 t.WriteStatus(stream, st) 1023 return st.Err() 1024 } 1025 } 1026 1027 // If cp is set, use it. Otherwise, attempt to compress the response using 1028 // the incoming message compression method. 1029 // 1030 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1031 if s.opts.cp != nil { 1032 cp = s.opts.cp 1033 stream.SetSendCompress(cp.Type()) 1034 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1035 // Legacy compressor not specified; attempt to respond with same encoding. 1036 comp = encoding.GetCompressor(rc) 1037 if comp != nil { 1038 stream.SetSendCompress(rc) 1039 } 1040 } 1041 1042 var payInfo *payloadInfo 1043 if sh != nil || binlog != nil { 1044 payInfo = &payloadInfo{} 1045 } 1046 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1047 if err != nil { 1048 if st, ok := status.FromError(err); ok { 1049 if e := t.WriteStatus(stream, st); e != nil { 1050 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) 1051 } 1052 } 1053 return err 1054 } 1055 if channelz.IsOn() { 1056 t.IncrMsgRecv() 1057 } 1058 df := func(v interface{}) error { 1059 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1060 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1061 } 1062 if sh != nil { 1063 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1064 RecvTime: time.Now(), 1065 Payload: v, 1066 WireLength: payInfo.wireLength, 1067 Data: d, 1068 Length: len(d), 1069 }) 1070 } 1071 if binlog != nil { 1072 binlog.Log(&binarylog.ClientMessage{ 1073 Message: d, 1074 }) 1075 } 1076 if trInfo != nil { 1077 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1078 } 1079 return nil 1080 } 1081 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1082 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 1083 if appErr != nil { 1084 appStatus, ok := status.FromError(appErr) 1085 if !ok { 1086 // Convert appErr if it is not a grpc status error. 1087 appErr = status.Error(codes.Unknown, appErr.Error()) 1088 appStatus, _ = status.FromError(appErr) 1089 } 1090 if trInfo != nil { 1091 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1092 trInfo.tr.SetError() 1093 } 1094 if e := t.WriteStatus(stream, appStatus); e != nil { 1095 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1096 } 1097 if binlog != nil { 1098 if h, _ := stream.Header(); h.Len() > 0 { 1099 // Only log serverHeader if there was header. Otherwise it can 1100 // be trailer only. 1101 binlog.Log(&binarylog.ServerHeader{ 1102 Header: h, 1103 }) 1104 } 1105 binlog.Log(&binarylog.ServerTrailer{ 1106 Trailer: stream.Trailer(), 1107 Err: appErr, 1108 }) 1109 } 1110 return appErr 1111 } 1112 if trInfo != nil { 1113 trInfo.tr.LazyLog(stringer("OK"), false) 1114 } 1115 opts := &transport.Options{Last: true} 1116 1117 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1118 if err == io.EOF { 1119 // The entire stream is done (for unary RPC only). 1120 return err 1121 } 1122 if sts, ok := status.FromError(err); ok { 1123 if e := t.WriteStatus(stream, sts); e != nil { 1124 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1125 } 1126 } else { 1127 switch st := err.(type) { 1128 case transport.ConnectionError: 1129 // Nothing to do here. 1130 default: 1131 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1132 } 1133 } 1134 if binlog != nil { 1135 h, _ := stream.Header() 1136 binlog.Log(&binarylog.ServerHeader{ 1137 Header: h, 1138 }) 1139 binlog.Log(&binarylog.ServerTrailer{ 1140 Trailer: stream.Trailer(), 1141 Err: appErr, 1142 }) 1143 } 1144 return err 1145 } 1146 if binlog != nil { 1147 h, _ := stream.Header() 1148 binlog.Log(&binarylog.ServerHeader{ 1149 Header: h, 1150 }) 1151 binlog.Log(&binarylog.ServerMessage{ 1152 Message: reply, 1153 }) 1154 } 1155 if channelz.IsOn() { 1156 t.IncrMsgSent() 1157 } 1158 if trInfo != nil { 1159 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1160 } 1161 // TODO: Should we be logging if writing status failed here, like above? 1162 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1163 // error or allow the stats handler to see it? 1164 err = t.WriteStatus(stream, statusOK) 1165 if binlog != nil { 1166 binlog.Log(&binarylog.ServerTrailer{ 1167 Trailer: stream.Trailer(), 1168 Err: appErr, 1169 }) 1170 } 1171 return err 1172} 1173 1174// chainStreamServerInterceptors chains all stream server interceptors into one. 1175func chainStreamServerInterceptors(s *Server) { 1176 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1177 // be executed before any other chained interceptors. 1178 interceptors := s.opts.chainStreamInts 1179 if s.opts.streamInt != nil { 1180 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1181 } 1182 1183 var chainedInt StreamServerInterceptor 1184 if len(interceptors) == 0 { 1185 chainedInt = nil 1186 } else if len(interceptors) == 1 { 1187 chainedInt = interceptors[0] 1188 } else { 1189 chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1190 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) 1191 } 1192 } 1193 1194 s.opts.streamInt = chainedInt 1195} 1196 1197// getChainStreamHandler recursively generate the chained StreamHandler 1198func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { 1199 if curr == len(interceptors)-1 { 1200 return finalHandler 1201 } 1202 1203 return func(srv interface{}, ss ServerStream) error { 1204 return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) 1205 } 1206} 1207 1208func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1209 if channelz.IsOn() { 1210 s.incrCallsStarted() 1211 } 1212 sh := s.opts.statsHandler 1213 var statsBegin *stats.Begin 1214 if sh != nil { 1215 beginTime := time.Now() 1216 statsBegin = &stats.Begin{ 1217 BeginTime: beginTime, 1218 } 1219 sh.HandleRPC(stream.Context(), statsBegin) 1220 } 1221 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1222 ss := &serverStream{ 1223 ctx: ctx, 1224 t: t, 1225 s: stream, 1226 p: &parser{r: stream}, 1227 codec: s.getCodec(stream.ContentSubtype()), 1228 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1229 maxSendMessageSize: s.opts.maxSendMessageSize, 1230 trInfo: trInfo, 1231 statsHandler: sh, 1232 } 1233 1234 if sh != nil || trInfo != nil || channelz.IsOn() { 1235 // See comment in processUnaryRPC on defers. 1236 defer func() { 1237 if trInfo != nil { 1238 ss.mu.Lock() 1239 if err != nil && err != io.EOF { 1240 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1241 ss.trInfo.tr.SetError() 1242 } 1243 ss.trInfo.tr.Finish() 1244 ss.trInfo.tr = nil 1245 ss.mu.Unlock() 1246 } 1247 1248 if sh != nil { 1249 end := &stats.End{ 1250 BeginTime: statsBegin.BeginTime, 1251 EndTime: time.Now(), 1252 } 1253 if err != nil && err != io.EOF { 1254 end.Error = toRPCErr(err) 1255 } 1256 sh.HandleRPC(stream.Context(), end) 1257 } 1258 1259 if channelz.IsOn() { 1260 if err != nil && err != io.EOF { 1261 s.incrCallsFailed() 1262 } else { 1263 s.incrCallsSucceeded() 1264 } 1265 } 1266 }() 1267 } 1268 1269 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1270 if ss.binlog != nil { 1271 md, _ := metadata.FromIncomingContext(ctx) 1272 logEntry := &binarylog.ClientHeader{ 1273 Header: md, 1274 MethodName: stream.Method(), 1275 PeerAddr: nil, 1276 } 1277 if deadline, ok := ctx.Deadline(); ok { 1278 logEntry.Timeout = time.Until(deadline) 1279 if logEntry.Timeout < 0 { 1280 logEntry.Timeout = 0 1281 } 1282 } 1283 if a := md[":authority"]; len(a) > 0 { 1284 logEntry.Authority = a[0] 1285 } 1286 if peer, ok := peer.FromContext(ss.Context()); ok { 1287 logEntry.PeerAddr = peer.Addr 1288 } 1289 ss.binlog.Log(logEntry) 1290 } 1291 1292 // If dc is set and matches the stream's compression, use it. Otherwise, try 1293 // to find a matching registered compressor for decomp. 1294 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1295 ss.dc = s.opts.dc 1296 } else if rc != "" && rc != encoding.Identity { 1297 ss.decomp = encoding.GetCompressor(rc) 1298 if ss.decomp == nil { 1299 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1300 t.WriteStatus(ss.s, st) 1301 return st.Err() 1302 } 1303 } 1304 1305 // If cp is set, use it. Otherwise, attempt to compress the response using 1306 // the incoming message compression method. 1307 // 1308 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1309 if s.opts.cp != nil { 1310 ss.cp = s.opts.cp 1311 stream.SetSendCompress(s.opts.cp.Type()) 1312 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1313 // Legacy compressor not specified; attempt to respond with same encoding. 1314 ss.comp = encoding.GetCompressor(rc) 1315 if ss.comp != nil { 1316 stream.SetSendCompress(rc) 1317 } 1318 } 1319 1320 if trInfo != nil { 1321 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1322 } 1323 var appErr error 1324 var server interface{} 1325 if srv != nil { 1326 server = srv.server 1327 } 1328 if s.opts.streamInt == nil { 1329 appErr = sd.Handler(server, ss) 1330 } else { 1331 info := &StreamServerInfo{ 1332 FullMethod: stream.Method(), 1333 IsClientStream: sd.ClientStreams, 1334 IsServerStream: sd.ServerStreams, 1335 } 1336 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1337 } 1338 if appErr != nil { 1339 appStatus, ok := status.FromError(appErr) 1340 if !ok { 1341 appStatus = status.New(codes.Unknown, appErr.Error()) 1342 appErr = appStatus.Err() 1343 } 1344 if trInfo != nil { 1345 ss.mu.Lock() 1346 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1347 ss.trInfo.tr.SetError() 1348 ss.mu.Unlock() 1349 } 1350 t.WriteStatus(ss.s, appStatus) 1351 if ss.binlog != nil { 1352 ss.binlog.Log(&binarylog.ServerTrailer{ 1353 Trailer: ss.s.Trailer(), 1354 Err: appErr, 1355 }) 1356 } 1357 // TODO: Should we log an error from WriteStatus here and below? 1358 return appErr 1359 } 1360 if trInfo != nil { 1361 ss.mu.Lock() 1362 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1363 ss.mu.Unlock() 1364 } 1365 err = t.WriteStatus(ss.s, statusOK) 1366 if ss.binlog != nil { 1367 ss.binlog.Log(&binarylog.ServerTrailer{ 1368 Trailer: ss.s.Trailer(), 1369 Err: appErr, 1370 }) 1371 } 1372 return err 1373} 1374 1375func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1376 sm := stream.Method() 1377 if sm != "" && sm[0] == '/' { 1378 sm = sm[1:] 1379 } 1380 pos := strings.LastIndex(sm, "/") 1381 if pos == -1 { 1382 if trInfo != nil { 1383 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1384 trInfo.tr.SetError() 1385 } 1386 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1387 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1388 if trInfo != nil { 1389 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1390 trInfo.tr.SetError() 1391 } 1392 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1393 } 1394 if trInfo != nil { 1395 trInfo.tr.Finish() 1396 } 1397 return 1398 } 1399 service := sm[:pos] 1400 method := sm[pos+1:] 1401 1402 srv, knownService := s.m[service] 1403 if knownService { 1404 if md, ok := srv.md[method]; ok { 1405 s.processUnaryRPC(t, stream, srv, md, trInfo) 1406 return 1407 } 1408 if sd, ok := srv.sd[method]; ok { 1409 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1410 return 1411 } 1412 } 1413 // Unknown service, or known server unknown method. 1414 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1415 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1416 return 1417 } 1418 var errDesc string 1419 if !knownService { 1420 errDesc = fmt.Sprintf("unknown service %v", service) 1421 } else { 1422 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1423 } 1424 if trInfo != nil { 1425 trInfo.tr.LazyPrintf("%s", errDesc) 1426 trInfo.tr.SetError() 1427 } 1428 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1429 if trInfo != nil { 1430 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1431 trInfo.tr.SetError() 1432 } 1433 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1434 } 1435 if trInfo != nil { 1436 trInfo.tr.Finish() 1437 } 1438} 1439 1440// The key to save ServerTransportStream in the context. 1441type streamKey struct{} 1442 1443// NewContextWithServerTransportStream creates a new context from ctx and 1444// attaches stream to it. 1445// 1446// This API is EXPERIMENTAL. 1447func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1448 return context.WithValue(ctx, streamKey{}, stream) 1449} 1450 1451// ServerTransportStream is a minimal interface that a transport stream must 1452// implement. This can be used to mock an actual transport stream for tests of 1453// handler code that use, for example, grpc.SetHeader (which requires some 1454// stream to be in context). 1455// 1456// See also NewContextWithServerTransportStream. 1457// 1458// This API is EXPERIMENTAL. 1459type ServerTransportStream interface { 1460 Method() string 1461 SetHeader(md metadata.MD) error 1462 SendHeader(md metadata.MD) error 1463 SetTrailer(md metadata.MD) error 1464} 1465 1466// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1467// ctx. Returns nil if the given context has no stream associated with it 1468// (which implies it is not an RPC invocation context). 1469// 1470// This API is EXPERIMENTAL. 1471func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1472 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1473 return s 1474} 1475 1476// Stop stops the gRPC server. It immediately closes all open 1477// connections and listeners. 1478// It cancels all active RPCs on the server side and the corresponding 1479// pending RPCs on the client side will get notified by connection 1480// errors. 1481func (s *Server) Stop() { 1482 s.quit.Fire() 1483 1484 defer func() { 1485 s.serveWG.Wait() 1486 s.done.Fire() 1487 }() 1488 1489 s.channelzRemoveOnce.Do(func() { 1490 if channelz.IsOn() { 1491 channelz.RemoveEntry(s.channelzID) 1492 } 1493 }) 1494 1495 s.mu.Lock() 1496 listeners := s.lis 1497 s.lis = nil 1498 st := s.conns 1499 s.conns = nil 1500 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1501 s.cv.Broadcast() 1502 s.mu.Unlock() 1503 1504 for lis := range listeners { 1505 lis.Close() 1506 } 1507 for c := range st { 1508 c.Close() 1509 } 1510 1511 s.mu.Lock() 1512 if s.events != nil { 1513 s.events.Finish() 1514 s.events = nil 1515 } 1516 s.mu.Unlock() 1517} 1518 1519// GracefulStop stops the gRPC server gracefully. It stops the server from 1520// accepting new connections and RPCs and blocks until all the pending RPCs are 1521// finished. 1522func (s *Server) GracefulStop() { 1523 s.quit.Fire() 1524 defer s.done.Fire() 1525 1526 s.channelzRemoveOnce.Do(func() { 1527 if channelz.IsOn() { 1528 channelz.RemoveEntry(s.channelzID) 1529 } 1530 }) 1531 s.mu.Lock() 1532 if s.conns == nil { 1533 s.mu.Unlock() 1534 return 1535 } 1536 1537 for lis := range s.lis { 1538 lis.Close() 1539 } 1540 s.lis = nil 1541 if !s.drain { 1542 for st := range s.conns { 1543 st.Drain() 1544 } 1545 s.drain = true 1546 } 1547 1548 // Wait for serving threads to be ready to exit. Only then can we be sure no 1549 // new conns will be created. 1550 s.mu.Unlock() 1551 s.serveWG.Wait() 1552 s.mu.Lock() 1553 1554 for len(s.conns) != 0 { 1555 s.cv.Wait() 1556 } 1557 s.conns = nil 1558 if s.events != nil { 1559 s.events.Finish() 1560 s.events = nil 1561 } 1562 s.mu.Unlock() 1563} 1564 1565// contentSubtype must be lowercase 1566// cannot return nil 1567func (s *Server) getCodec(contentSubtype string) baseCodec { 1568 if s.opts.codec != nil { 1569 return s.opts.codec 1570 } 1571 if contentSubtype == "" { 1572 return encoding.GetCodec(proto.Name) 1573 } 1574 codec := encoding.GetCodec(contentSubtype) 1575 if codec == nil { 1576 return encoding.GetCodec(proto.Name) 1577 } 1578 return codec 1579} 1580 1581// SetHeader sets the header metadata. 1582// When called multiple times, all the provided metadata will be merged. 1583// All the metadata will be sent out when one of the following happens: 1584// - grpc.SendHeader() is called; 1585// - The first response is sent out; 1586// - An RPC status is sent out (error or success). 1587func SetHeader(ctx context.Context, md metadata.MD) error { 1588 if md.Len() == 0 { 1589 return nil 1590 } 1591 stream := ServerTransportStreamFromContext(ctx) 1592 if stream == nil { 1593 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1594 } 1595 return stream.SetHeader(md) 1596} 1597 1598// SendHeader sends header metadata. It may be called at most once. 1599// The provided md and headers set by SetHeader() will be sent. 1600func SendHeader(ctx context.Context, md metadata.MD) error { 1601 stream := ServerTransportStreamFromContext(ctx) 1602 if stream == nil { 1603 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1604 } 1605 if err := stream.SendHeader(md); err != nil { 1606 return toRPCErr(err) 1607 } 1608 return nil 1609} 1610 1611// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1612// When called more than once, all the provided metadata will be merged. 1613func SetTrailer(ctx context.Context, md metadata.MD) error { 1614 if md.Len() == 0 { 1615 return nil 1616 } 1617 stream := ServerTransportStreamFromContext(ctx) 1618 if stream == nil { 1619 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1620 } 1621 return stream.SetTrailer(md) 1622} 1623 1624// Method returns the method string for the server context. The returned 1625// string is in the format of "/service/method". 1626func Method(ctx context.Context) (string, bool) { 1627 s := ServerTransportStreamFromContext(ctx) 1628 if s == nil { 1629 return "", false 1630 } 1631 return s.Method(), true 1632} 1633 1634type channelzServer struct { 1635 s *Server 1636} 1637 1638func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1639 return c.s.channelzMetric() 1640} 1641