1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal/binarylog" 44 "google.golang.org/grpc/internal/channelz" 45 "google.golang.org/grpc/internal/grpcsync" 46 "google.golang.org/grpc/internal/transport" 47 "google.golang.org/grpc/keepalive" 48 "google.golang.org/grpc/metadata" 49 "google.golang.org/grpc/peer" 50 "google.golang.org/grpc/stats" 51 "google.golang.org/grpc/status" 52 "google.golang.org/grpc/tap" 53) 54 55const ( 56 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 57 defaultServerMaxSendMessageSize = math.MaxInt32 58) 59 60var statusOK = status.New(codes.OK, "") 61 62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 63 64// MethodDesc represents an RPC service's method specification. 65type MethodDesc struct { 66 MethodName string 67 Handler methodHandler 68} 69 70// ServiceDesc represents an RPC service's specification. 71type ServiceDesc struct { 72 ServiceName string 73 // The pointer to the service interface. Used to check whether the user 74 // provided implementation satisfies the interface requirements. 75 HandlerType interface{} 76 Methods []MethodDesc 77 Streams []StreamDesc 78 Metadata interface{} 79} 80 81// service consists of the information of the server serving this service and 82// the methods in this service. 83type service struct { 84 server interface{} // the server for service methods 85 md map[string]*MethodDesc 86 sd map[string]*StreamDesc 87 mdata interface{} 88} 89 90// Server is a gRPC server to serve RPC requests. 91type Server struct { 92 opts serverOptions 93 94 mu sync.Mutex // guards following 95 lis map[net.Listener]bool 96 conns map[transport.ServerTransport]bool 97 serve bool 98 drain bool 99 cv *sync.Cond // signaled when connections close for GracefulStop 100 m map[string]*service // service name -> service info 101 events trace.EventLog 102 103 quit *grpcsync.Event 104 done *grpcsync.Event 105 channelzRemoveOnce sync.Once 106 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 107 108 channelzID int64 // channelz unique identification number 109 czData *channelzData 110} 111 112type serverOptions struct { 113 creds credentials.TransportCredentials 114 codec baseCodec 115 cp Compressor 116 dc Decompressor 117 unaryInt UnaryServerInterceptor 118 streamInt StreamServerInterceptor 119 inTapHandle tap.ServerInHandle 120 statsHandler stats.Handler 121 maxConcurrentStreams uint32 122 maxReceiveMessageSize int 123 maxSendMessageSize int 124 unknownStreamDesc *StreamDesc 125 keepaliveParams keepalive.ServerParameters 126 keepalivePolicy keepalive.EnforcementPolicy 127 initialWindowSize int32 128 initialConnWindowSize int32 129 writeBufferSize int 130 readBufferSize int 131 connectionTimeout time.Duration 132 maxHeaderListSize *uint32 133} 134 135var defaultServerOptions = serverOptions{ 136 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 137 maxSendMessageSize: defaultServerMaxSendMessageSize, 138 connectionTimeout: 120 * time.Second, 139 writeBufferSize: defaultWriteBufSize, 140 readBufferSize: defaultReadBufSize, 141} 142 143// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 144type ServerOption interface { 145 apply(*serverOptions) 146} 147 148// EmptyServerOption does not alter the server configuration. It can be embedded 149// in another structure to build custom server options. 150// 151// This API is EXPERIMENTAL. 152type EmptyServerOption struct{} 153 154func (EmptyServerOption) apply(*serverOptions) {} 155 156// funcServerOption wraps a function that modifies serverOptions into an 157// implementation of the ServerOption interface. 158type funcServerOption struct { 159 f func(*serverOptions) 160} 161 162func (fdo *funcServerOption) apply(do *serverOptions) { 163 fdo.f(do) 164} 165 166func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 167 return &funcServerOption{ 168 f: f, 169 } 170} 171 172// WriteBufferSize determines how much data can be batched before doing a write on the wire. 173// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 174// The default value for this buffer is 32KB. 175// Zero will disable the write buffer such that each write will be on underlying connection. 176// Note: A Send call may not directly translate to a write. 177func WriteBufferSize(s int) ServerOption { 178 return newFuncServerOption(func(o *serverOptions) { 179 o.writeBufferSize = s 180 }) 181} 182 183// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 184// for one read syscall. 185// The default value for this buffer is 32KB. 186// Zero will disable read buffer for a connection so data framer can access the underlying 187// conn directly. 188func ReadBufferSize(s int) ServerOption { 189 return newFuncServerOption(func(o *serverOptions) { 190 o.readBufferSize = s 191 }) 192} 193 194// InitialWindowSize returns a ServerOption that sets window size for stream. 195// The lower bound for window size is 64K and any value smaller than that will be ignored. 196func InitialWindowSize(s int32) ServerOption { 197 return newFuncServerOption(func(o *serverOptions) { 198 o.initialWindowSize = s 199 }) 200} 201 202// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 203// The lower bound for window size is 64K and any value smaller than that will be ignored. 204func InitialConnWindowSize(s int32) ServerOption { 205 return newFuncServerOption(func(o *serverOptions) { 206 o.initialConnWindowSize = s 207 }) 208} 209 210// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 211func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 212 if kp.Time > 0 && kp.Time < time.Second { 213 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") 214 kp.Time = time.Second 215 } 216 217 return newFuncServerOption(func(o *serverOptions) { 218 o.keepaliveParams = kp 219 }) 220} 221 222// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 223func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 224 return newFuncServerOption(func(o *serverOptions) { 225 o.keepalivePolicy = kep 226 }) 227} 228 229// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 230// 231// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 232func CustomCodec(codec Codec) ServerOption { 233 return newFuncServerOption(func(o *serverOptions) { 234 o.codec = codec 235 }) 236} 237 238// RPCCompressor returns a ServerOption that sets a compressor for outbound 239// messages. For backward compatibility, all outbound messages will be sent 240// using this compressor, regardless of incoming message compression. By 241// default, server messages will be sent using the same compressor with which 242// request messages were sent. 243// 244// Deprecated: use encoding.RegisterCompressor instead. 245func RPCCompressor(cp Compressor) ServerOption { 246 return newFuncServerOption(func(o *serverOptions) { 247 o.cp = cp 248 }) 249} 250 251// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 252// messages. It has higher priority than decompressors registered via 253// encoding.RegisterCompressor. 254// 255// Deprecated: use encoding.RegisterCompressor instead. 256func RPCDecompressor(dc Decompressor) ServerOption { 257 return newFuncServerOption(func(o *serverOptions) { 258 o.dc = dc 259 }) 260} 261 262// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 263// If this is not set, gRPC uses the default limit. 264// 265// Deprecated: use MaxRecvMsgSize instead. 266func MaxMsgSize(m int) ServerOption { 267 return MaxRecvMsgSize(m) 268} 269 270// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 271// If this is not set, gRPC uses the default 4MB. 272func MaxRecvMsgSize(m int) ServerOption { 273 return newFuncServerOption(func(o *serverOptions) { 274 o.maxReceiveMessageSize = m 275 }) 276} 277 278// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 279// If this is not set, gRPC uses the default `math.MaxInt32`. 280func MaxSendMsgSize(m int) ServerOption { 281 return newFuncServerOption(func(o *serverOptions) { 282 o.maxSendMessageSize = m 283 }) 284} 285 286// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 287// of concurrent streams to each ServerTransport. 288func MaxConcurrentStreams(n uint32) ServerOption { 289 return newFuncServerOption(func(o *serverOptions) { 290 o.maxConcurrentStreams = n 291 }) 292} 293 294// Creds returns a ServerOption that sets credentials for server connections. 295func Creds(c credentials.TransportCredentials) ServerOption { 296 return newFuncServerOption(func(o *serverOptions) { 297 o.creds = c 298 }) 299} 300 301// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 302// server. Only one unary interceptor can be installed. The construction of multiple 303// interceptors (e.g., chaining) can be implemented at the caller. 304func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 305 return newFuncServerOption(func(o *serverOptions) { 306 if o.unaryInt != nil { 307 panic("The unary server interceptor was already set and may not be reset.") 308 } 309 o.unaryInt = i 310 }) 311} 312 313// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 314// server. Only one stream interceptor can be installed. 315func StreamInterceptor(i StreamServerInterceptor) ServerOption { 316 return newFuncServerOption(func(o *serverOptions) { 317 if o.streamInt != nil { 318 panic("The stream server interceptor was already set and may not be reset.") 319 } 320 o.streamInt = i 321 }) 322} 323 324// InTapHandle returns a ServerOption that sets the tap handle for all the server 325// transport to be created. Only one can be installed. 326func InTapHandle(h tap.ServerInHandle) ServerOption { 327 return newFuncServerOption(func(o *serverOptions) { 328 if o.inTapHandle != nil { 329 panic("The tap handle was already set and may not be reset.") 330 } 331 o.inTapHandle = h 332 }) 333} 334 335// StatsHandler returns a ServerOption that sets the stats handler for the server. 336func StatsHandler(h stats.Handler) ServerOption { 337 return newFuncServerOption(func(o *serverOptions) { 338 o.statsHandler = h 339 }) 340} 341 342// UnknownServiceHandler returns a ServerOption that allows for adding a custom 343// unknown service handler. The provided method is a bidi-streaming RPC service 344// handler that will be invoked instead of returning the "unimplemented" gRPC 345// error whenever a request is received for an unregistered service or method. 346// The handling function has full access to the Context of the request and the 347// stream, and the invocation bypasses interceptors. 348func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 349 return newFuncServerOption(func(o *serverOptions) { 350 o.unknownStreamDesc = &StreamDesc{ 351 StreamName: "unknown_service_handler", 352 Handler: streamHandler, 353 // We need to assume that the users of the streamHandler will want to use both. 354 ClientStreams: true, 355 ServerStreams: true, 356 } 357 }) 358} 359 360// ConnectionTimeout returns a ServerOption that sets the timeout for 361// connection establishment (up to and including HTTP/2 handshaking) for all 362// new connections. If this is not set, the default is 120 seconds. A zero or 363// negative value will result in an immediate timeout. 364// 365// This API is EXPERIMENTAL. 366func ConnectionTimeout(d time.Duration) ServerOption { 367 return newFuncServerOption(func(o *serverOptions) { 368 o.connectionTimeout = d 369 }) 370} 371 372// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 373// of header list that the server is prepared to accept. 374func MaxHeaderListSize(s uint32) ServerOption { 375 return newFuncServerOption(func(o *serverOptions) { 376 o.maxHeaderListSize = &s 377 }) 378} 379 380// NewServer creates a gRPC server which has no service registered and has not 381// started to accept requests yet. 382func NewServer(opt ...ServerOption) *Server { 383 opts := defaultServerOptions 384 for _, o := range opt { 385 o.apply(&opts) 386 } 387 s := &Server{ 388 lis: make(map[net.Listener]bool), 389 opts: opts, 390 conns: make(map[transport.ServerTransport]bool), 391 m: make(map[string]*service), 392 quit: grpcsync.NewEvent(), 393 done: grpcsync.NewEvent(), 394 czData: new(channelzData), 395 } 396 s.cv = sync.NewCond(&s.mu) 397 if EnableTracing { 398 _, file, line, _ := runtime.Caller(1) 399 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 400 } 401 402 if channelz.IsOn() { 403 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 404 } 405 return s 406} 407 408// printf records an event in s's event log, unless s has been stopped. 409// REQUIRES s.mu is held. 410func (s *Server) printf(format string, a ...interface{}) { 411 if s.events != nil { 412 s.events.Printf(format, a...) 413 } 414} 415 416// errorf records an error in s's event log, unless s has been stopped. 417// REQUIRES s.mu is held. 418func (s *Server) errorf(format string, a ...interface{}) { 419 if s.events != nil { 420 s.events.Errorf(format, a...) 421 } 422} 423 424// RegisterService registers a service and its implementation to the gRPC 425// server. It is called from the IDL generated code. This must be called before 426// invoking Serve. 427func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 428 ht := reflect.TypeOf(sd.HandlerType).Elem() 429 st := reflect.TypeOf(ss) 430 if !st.Implements(ht) { 431 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 432 } 433 s.register(sd, ss) 434} 435 436func (s *Server) register(sd *ServiceDesc, ss interface{}) { 437 s.mu.Lock() 438 defer s.mu.Unlock() 439 s.printf("RegisterService(%q)", sd.ServiceName) 440 if s.serve { 441 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 442 } 443 if _, ok := s.m[sd.ServiceName]; ok { 444 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 445 } 446 srv := &service{ 447 server: ss, 448 md: make(map[string]*MethodDesc), 449 sd: make(map[string]*StreamDesc), 450 mdata: sd.Metadata, 451 } 452 for i := range sd.Methods { 453 d := &sd.Methods[i] 454 srv.md[d.MethodName] = d 455 } 456 for i := range sd.Streams { 457 d := &sd.Streams[i] 458 srv.sd[d.StreamName] = d 459 } 460 s.m[sd.ServiceName] = srv 461} 462 463// MethodInfo contains the information of an RPC including its method name and type. 464type MethodInfo struct { 465 // Name is the method name only, without the service name or package name. 466 Name string 467 // IsClientStream indicates whether the RPC is a client streaming RPC. 468 IsClientStream bool 469 // IsServerStream indicates whether the RPC is a server streaming RPC. 470 IsServerStream bool 471} 472 473// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 474type ServiceInfo struct { 475 Methods []MethodInfo 476 // Metadata is the metadata specified in ServiceDesc when registering service. 477 Metadata interface{} 478} 479 480// GetServiceInfo returns a map from service names to ServiceInfo. 481// Service names include the package names, in the form of <package>.<service>. 482func (s *Server) GetServiceInfo() map[string]ServiceInfo { 483 ret := make(map[string]ServiceInfo) 484 for n, srv := range s.m { 485 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 486 for m := range srv.md { 487 methods = append(methods, MethodInfo{ 488 Name: m, 489 IsClientStream: false, 490 IsServerStream: false, 491 }) 492 } 493 for m, d := range srv.sd { 494 methods = append(methods, MethodInfo{ 495 Name: m, 496 IsClientStream: d.ClientStreams, 497 IsServerStream: d.ServerStreams, 498 }) 499 } 500 501 ret[n] = ServiceInfo{ 502 Methods: methods, 503 Metadata: srv.mdata, 504 } 505 } 506 return ret 507} 508 509// ErrServerStopped indicates that the operation is now illegal because of 510// the server being stopped. 511var ErrServerStopped = errors.New("grpc: the server has been stopped") 512 513func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 514 if s.opts.creds == nil { 515 return rawConn, nil, nil 516 } 517 return s.opts.creds.ServerHandshake(rawConn) 518} 519 520type listenSocket struct { 521 net.Listener 522 channelzID int64 523} 524 525func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 526 return &channelz.SocketInternalMetric{ 527 SocketOptions: channelz.GetSocketOption(l.Listener), 528 LocalAddr: l.Listener.Addr(), 529 } 530} 531 532func (l *listenSocket) Close() error { 533 err := l.Listener.Close() 534 if channelz.IsOn() { 535 channelz.RemoveEntry(l.channelzID) 536 } 537 return err 538} 539 540// Serve accepts incoming connections on the listener lis, creating a new 541// ServerTransport and service goroutine for each. The service goroutines 542// read gRPC requests and then call the registered handlers to reply to them. 543// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 544// this method returns. 545// Serve will return a non-nil error unless Stop or GracefulStop is called. 546func (s *Server) Serve(lis net.Listener) error { 547 s.mu.Lock() 548 s.printf("serving") 549 s.serve = true 550 if s.lis == nil { 551 // Serve called after Stop or GracefulStop. 552 s.mu.Unlock() 553 lis.Close() 554 return ErrServerStopped 555 } 556 557 s.serveWG.Add(1) 558 defer func() { 559 s.serveWG.Done() 560 if s.quit.HasFired() { 561 // Stop or GracefulStop called; block until done and return nil. 562 <-s.done.Done() 563 } 564 }() 565 566 ls := &listenSocket{Listener: lis} 567 s.lis[ls] = true 568 569 if channelz.IsOn() { 570 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 571 } 572 s.mu.Unlock() 573 574 defer func() { 575 s.mu.Lock() 576 if s.lis != nil && s.lis[ls] { 577 ls.Close() 578 delete(s.lis, ls) 579 } 580 s.mu.Unlock() 581 }() 582 583 var tempDelay time.Duration // how long to sleep on accept failure 584 585 for { 586 rawConn, err := lis.Accept() 587 if err != nil { 588 if ne, ok := err.(interface { 589 Temporary() bool 590 }); ok && ne.Temporary() { 591 if tempDelay == 0 { 592 tempDelay = 5 * time.Millisecond 593 } else { 594 tempDelay *= 2 595 } 596 if max := 1 * time.Second; tempDelay > max { 597 tempDelay = max 598 } 599 s.mu.Lock() 600 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 601 s.mu.Unlock() 602 timer := time.NewTimer(tempDelay) 603 select { 604 case <-timer.C: 605 case <-s.quit.Done(): 606 timer.Stop() 607 return nil 608 } 609 continue 610 } 611 s.mu.Lock() 612 s.printf("done serving; Accept = %v", err) 613 s.mu.Unlock() 614 615 if s.quit.HasFired() { 616 return nil 617 } 618 return err 619 } 620 tempDelay = 0 621 // Start a new goroutine to deal with rawConn so we don't stall this Accept 622 // loop goroutine. 623 // 624 // Make sure we account for the goroutine so GracefulStop doesn't nil out 625 // s.conns before this conn can be added. 626 s.serveWG.Add(1) 627 go func() { 628 s.handleRawConn(rawConn) 629 s.serveWG.Done() 630 }() 631 } 632} 633 634// handleRawConn forks a goroutine to handle a just-accepted connection that 635// has not had any I/O performed on it yet. 636func (s *Server) handleRawConn(rawConn net.Conn) { 637 if s.quit.HasFired() { 638 rawConn.Close() 639 return 640 } 641 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 642 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 643 if err != nil { 644 // ErrConnDispatched means that the connection was dispatched away from 645 // gRPC; those connections should be left open. 646 if err != credentials.ErrConnDispatched { 647 s.mu.Lock() 648 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 649 s.mu.Unlock() 650 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 651 rawConn.Close() 652 } 653 rawConn.SetDeadline(time.Time{}) 654 return 655 } 656 657 // Finish handshaking (HTTP2) 658 st := s.newHTTP2Transport(conn, authInfo) 659 if st == nil { 660 return 661 } 662 663 rawConn.SetDeadline(time.Time{}) 664 if !s.addConn(st) { 665 return 666 } 667 go func() { 668 s.serveStreams(st) 669 s.removeConn(st) 670 }() 671} 672 673// newHTTP2Transport sets up a http/2 transport (using the 674// gRPC http2 server transport in transport/http2_server.go). 675func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 676 config := &transport.ServerConfig{ 677 MaxStreams: s.opts.maxConcurrentStreams, 678 AuthInfo: authInfo, 679 InTapHandle: s.opts.inTapHandle, 680 StatsHandler: s.opts.statsHandler, 681 KeepaliveParams: s.opts.keepaliveParams, 682 KeepalivePolicy: s.opts.keepalivePolicy, 683 InitialWindowSize: s.opts.initialWindowSize, 684 InitialConnWindowSize: s.opts.initialConnWindowSize, 685 WriteBufferSize: s.opts.writeBufferSize, 686 ReadBufferSize: s.opts.readBufferSize, 687 ChannelzParentID: s.channelzID, 688 MaxHeaderListSize: s.opts.maxHeaderListSize, 689 } 690 st, err := transport.NewServerTransport("http2", c, config) 691 if err != nil { 692 s.mu.Lock() 693 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 694 s.mu.Unlock() 695 c.Close() 696 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) 697 return nil 698 } 699 700 return st 701} 702 703func (s *Server) serveStreams(st transport.ServerTransport) { 704 defer st.Close() 705 var wg sync.WaitGroup 706 st.HandleStreams(func(stream *transport.Stream) { 707 wg.Add(1) 708 go func() { 709 defer wg.Done() 710 s.handleStream(st, stream, s.traceInfo(st, stream)) 711 }() 712 }, func(ctx context.Context, method string) context.Context { 713 if !EnableTracing { 714 return ctx 715 } 716 tr := trace.New("grpc.Recv."+methodFamily(method), method) 717 return trace.NewContext(ctx, tr) 718 }) 719 wg.Wait() 720} 721 722var _ http.Handler = (*Server)(nil) 723 724// ServeHTTP implements the Go standard library's http.Handler 725// interface by responding to the gRPC request r, by looking up 726// the requested gRPC method in the gRPC server s. 727// 728// The provided HTTP request must have arrived on an HTTP/2 729// connection. When using the Go standard library's server, 730// practically this means that the Request must also have arrived 731// over TLS. 732// 733// To share one port (such as 443 for https) between gRPC and an 734// existing http.Handler, use a root http.Handler such as: 735// 736// if r.ProtoMajor == 2 && strings.HasPrefix( 737// r.Header.Get("Content-Type"), "application/grpc") { 738// grpcServer.ServeHTTP(w, r) 739// } else { 740// yourMux.ServeHTTP(w, r) 741// } 742// 743// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 744// separate from grpc-go's HTTP/2 server. Performance and features may vary 745// between the two paths. ServeHTTP does not support some gRPC features 746// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 747// and subject to change. 748func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 749 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 750 if err != nil { 751 http.Error(w, err.Error(), http.StatusInternalServerError) 752 return 753 } 754 if !s.addConn(st) { 755 return 756 } 757 defer s.removeConn(st) 758 s.serveStreams(st) 759} 760 761// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 762// If tracing is not enabled, it returns nil. 763func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 764 if !EnableTracing { 765 return nil 766 } 767 tr, ok := trace.FromContext(stream.Context()) 768 if !ok { 769 return nil 770 } 771 772 trInfo = &traceInfo{ 773 tr: tr, 774 firstLine: firstLine{ 775 client: false, 776 remoteAddr: st.RemoteAddr(), 777 }, 778 } 779 if dl, ok := stream.Context().Deadline(); ok { 780 trInfo.firstLine.deadline = time.Until(dl) 781 } 782 return trInfo 783} 784 785func (s *Server) addConn(st transport.ServerTransport) bool { 786 s.mu.Lock() 787 defer s.mu.Unlock() 788 if s.conns == nil { 789 st.Close() 790 return false 791 } 792 if s.drain { 793 // Transport added after we drained our existing conns: drain it 794 // immediately. 795 st.Drain() 796 } 797 s.conns[st] = true 798 return true 799} 800 801func (s *Server) removeConn(st transport.ServerTransport) { 802 s.mu.Lock() 803 defer s.mu.Unlock() 804 if s.conns != nil { 805 delete(s.conns, st) 806 s.cv.Broadcast() 807 } 808} 809 810func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 811 return &channelz.ServerInternalMetric{ 812 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 813 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 814 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 815 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 816 } 817} 818 819func (s *Server) incrCallsStarted() { 820 atomic.AddInt64(&s.czData.callsStarted, 1) 821 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 822} 823 824func (s *Server) incrCallsSucceeded() { 825 atomic.AddInt64(&s.czData.callsSucceeded, 1) 826} 827 828func (s *Server) incrCallsFailed() { 829 atomic.AddInt64(&s.czData.callsFailed, 1) 830} 831 832func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 833 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 834 if err != nil { 835 grpclog.Errorln("grpc: server failed to encode response: ", err) 836 return err 837 } 838 compData, err := compress(data, cp, comp) 839 if err != nil { 840 grpclog.Errorln("grpc: server failed to compress response: ", err) 841 return err 842 } 843 hdr, payload := msgHeader(data, compData) 844 // TODO(dfawley): should we be checking len(data) instead? 845 if len(payload) > s.opts.maxSendMessageSize { 846 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 847 } 848 err = t.Write(stream, hdr, payload, opts) 849 if err == nil && s.opts.statsHandler != nil { 850 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 851 } 852 return err 853} 854 855func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 856 if channelz.IsOn() { 857 s.incrCallsStarted() 858 defer func() { 859 if err != nil && err != io.EOF { 860 s.incrCallsFailed() 861 } else { 862 s.incrCallsSucceeded() 863 } 864 }() 865 } 866 sh := s.opts.statsHandler 867 if sh != nil { 868 beginTime := time.Now() 869 begin := &stats.Begin{ 870 BeginTime: beginTime, 871 } 872 sh.HandleRPC(stream.Context(), begin) 873 defer func() { 874 end := &stats.End{ 875 BeginTime: beginTime, 876 EndTime: time.Now(), 877 } 878 if err != nil && err != io.EOF { 879 end.Error = toRPCErr(err) 880 } 881 sh.HandleRPC(stream.Context(), end) 882 }() 883 } 884 if trInfo != nil { 885 defer trInfo.tr.Finish() 886 trInfo.tr.LazyLog(&trInfo.firstLine, false) 887 defer func() { 888 if err != nil && err != io.EOF { 889 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 890 trInfo.tr.SetError() 891 } 892 }() 893 } 894 895 binlog := binarylog.GetMethodLogger(stream.Method()) 896 if binlog != nil { 897 ctx := stream.Context() 898 md, _ := metadata.FromIncomingContext(ctx) 899 logEntry := &binarylog.ClientHeader{ 900 Header: md, 901 MethodName: stream.Method(), 902 PeerAddr: nil, 903 } 904 if deadline, ok := ctx.Deadline(); ok { 905 logEntry.Timeout = time.Until(deadline) 906 if logEntry.Timeout < 0 { 907 logEntry.Timeout = 0 908 } 909 } 910 if a := md[":authority"]; len(a) > 0 { 911 logEntry.Authority = a[0] 912 } 913 if peer, ok := peer.FromContext(ctx); ok { 914 logEntry.PeerAddr = peer.Addr 915 } 916 binlog.Log(logEntry) 917 } 918 919 // comp and cp are used for compression. decomp and dc are used for 920 // decompression. If comp and decomp are both set, they are the same; 921 // however they are kept separate to ensure that at most one of the 922 // compressor/decompressor variable pairs are set for use later. 923 var comp, decomp encoding.Compressor 924 var cp Compressor 925 var dc Decompressor 926 927 // If dc is set and matches the stream's compression, use it. Otherwise, try 928 // to find a matching registered compressor for decomp. 929 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 930 dc = s.opts.dc 931 } else if rc != "" && rc != encoding.Identity { 932 decomp = encoding.GetCompressor(rc) 933 if decomp == nil { 934 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 935 t.WriteStatus(stream, st) 936 return st.Err() 937 } 938 } 939 940 // If cp is set, use it. Otherwise, attempt to compress the response using 941 // the incoming message compression method. 942 // 943 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 944 if s.opts.cp != nil { 945 cp = s.opts.cp 946 stream.SetSendCompress(cp.Type()) 947 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 948 // Legacy compressor not specified; attempt to respond with same encoding. 949 comp = encoding.GetCompressor(rc) 950 if comp != nil { 951 stream.SetSendCompress(rc) 952 } 953 } 954 955 var payInfo *payloadInfo 956 if sh != nil || binlog != nil { 957 payInfo = &payloadInfo{} 958 } 959 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 960 if err != nil { 961 if st, ok := status.FromError(err); ok { 962 if e := t.WriteStatus(stream, st); e != nil { 963 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 964 } 965 } 966 return err 967 } 968 if channelz.IsOn() { 969 t.IncrMsgRecv() 970 } 971 df := func(v interface{}) error { 972 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 973 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 974 } 975 if sh != nil { 976 sh.HandleRPC(stream.Context(), &stats.InPayload{ 977 RecvTime: time.Now(), 978 Payload: v, 979 WireLength: payInfo.wireLength, 980 Data: d, 981 Length: len(d), 982 }) 983 } 984 if binlog != nil { 985 binlog.Log(&binarylog.ClientMessage{ 986 Message: d, 987 }) 988 } 989 if trInfo != nil { 990 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 991 } 992 return nil 993 } 994 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 995 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 996 if appErr != nil { 997 appStatus, ok := status.FromError(appErr) 998 if !ok { 999 // Convert appErr if it is not a grpc status error. 1000 appErr = status.Error(codes.Unknown, appErr.Error()) 1001 appStatus, _ = status.FromError(appErr) 1002 } 1003 if trInfo != nil { 1004 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1005 trInfo.tr.SetError() 1006 } 1007 if e := t.WriteStatus(stream, appStatus); e != nil { 1008 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1009 } 1010 if binlog != nil { 1011 if h, _ := stream.Header(); h.Len() > 0 { 1012 // Only log serverHeader if there was header. Otherwise it can 1013 // be trailer only. 1014 binlog.Log(&binarylog.ServerHeader{ 1015 Header: h, 1016 }) 1017 } 1018 binlog.Log(&binarylog.ServerTrailer{ 1019 Trailer: stream.Trailer(), 1020 Err: appErr, 1021 }) 1022 } 1023 return appErr 1024 } 1025 if trInfo != nil { 1026 trInfo.tr.LazyLog(stringer("OK"), false) 1027 } 1028 opts := &transport.Options{Last: true} 1029 1030 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1031 if err == io.EOF { 1032 // The entire stream is done (for unary RPC only). 1033 return err 1034 } 1035 if s, ok := status.FromError(err); ok { 1036 if e := t.WriteStatus(stream, s); e != nil { 1037 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1038 } 1039 } else { 1040 switch st := err.(type) { 1041 case transport.ConnectionError: 1042 // Nothing to do here. 1043 default: 1044 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1045 } 1046 } 1047 if binlog != nil { 1048 h, _ := stream.Header() 1049 binlog.Log(&binarylog.ServerHeader{ 1050 Header: h, 1051 }) 1052 binlog.Log(&binarylog.ServerTrailer{ 1053 Trailer: stream.Trailer(), 1054 Err: appErr, 1055 }) 1056 } 1057 return err 1058 } 1059 if binlog != nil { 1060 h, _ := stream.Header() 1061 binlog.Log(&binarylog.ServerHeader{ 1062 Header: h, 1063 }) 1064 binlog.Log(&binarylog.ServerMessage{ 1065 Message: reply, 1066 }) 1067 } 1068 if channelz.IsOn() { 1069 t.IncrMsgSent() 1070 } 1071 if trInfo != nil { 1072 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1073 } 1074 // TODO: Should we be logging if writing status failed here, like above? 1075 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1076 // error or allow the stats handler to see it? 1077 err = t.WriteStatus(stream, statusOK) 1078 if binlog != nil { 1079 binlog.Log(&binarylog.ServerTrailer{ 1080 Trailer: stream.Trailer(), 1081 Err: appErr, 1082 }) 1083 } 1084 return err 1085} 1086 1087func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1088 if channelz.IsOn() { 1089 s.incrCallsStarted() 1090 defer func() { 1091 if err != nil && err != io.EOF { 1092 s.incrCallsFailed() 1093 } else { 1094 s.incrCallsSucceeded() 1095 } 1096 }() 1097 } 1098 sh := s.opts.statsHandler 1099 if sh != nil { 1100 beginTime := time.Now() 1101 begin := &stats.Begin{ 1102 BeginTime: beginTime, 1103 } 1104 sh.HandleRPC(stream.Context(), begin) 1105 defer func() { 1106 end := &stats.End{ 1107 BeginTime: beginTime, 1108 EndTime: time.Now(), 1109 } 1110 if err != nil && err != io.EOF { 1111 end.Error = toRPCErr(err) 1112 } 1113 sh.HandleRPC(stream.Context(), end) 1114 }() 1115 } 1116 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1117 ss := &serverStream{ 1118 ctx: ctx, 1119 t: t, 1120 s: stream, 1121 p: &parser{r: stream}, 1122 codec: s.getCodec(stream.ContentSubtype()), 1123 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1124 maxSendMessageSize: s.opts.maxSendMessageSize, 1125 trInfo: trInfo, 1126 statsHandler: sh, 1127 } 1128 1129 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1130 if ss.binlog != nil { 1131 md, _ := metadata.FromIncomingContext(ctx) 1132 logEntry := &binarylog.ClientHeader{ 1133 Header: md, 1134 MethodName: stream.Method(), 1135 PeerAddr: nil, 1136 } 1137 if deadline, ok := ctx.Deadline(); ok { 1138 logEntry.Timeout = time.Until(deadline) 1139 if logEntry.Timeout < 0 { 1140 logEntry.Timeout = 0 1141 } 1142 } 1143 if a := md[":authority"]; len(a) > 0 { 1144 logEntry.Authority = a[0] 1145 } 1146 if peer, ok := peer.FromContext(ss.Context()); ok { 1147 logEntry.PeerAddr = peer.Addr 1148 } 1149 ss.binlog.Log(logEntry) 1150 } 1151 1152 // If dc is set and matches the stream's compression, use it. Otherwise, try 1153 // to find a matching registered compressor for decomp. 1154 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1155 ss.dc = s.opts.dc 1156 } else if rc != "" && rc != encoding.Identity { 1157 ss.decomp = encoding.GetCompressor(rc) 1158 if ss.decomp == nil { 1159 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1160 t.WriteStatus(ss.s, st) 1161 return st.Err() 1162 } 1163 } 1164 1165 // If cp is set, use it. Otherwise, attempt to compress the response using 1166 // the incoming message compression method. 1167 // 1168 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1169 if s.opts.cp != nil { 1170 ss.cp = s.opts.cp 1171 stream.SetSendCompress(s.opts.cp.Type()) 1172 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1173 // Legacy compressor not specified; attempt to respond with same encoding. 1174 ss.comp = encoding.GetCompressor(rc) 1175 if ss.comp != nil { 1176 stream.SetSendCompress(rc) 1177 } 1178 } 1179 1180 if trInfo != nil { 1181 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1182 defer func() { 1183 ss.mu.Lock() 1184 if err != nil && err != io.EOF { 1185 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1186 ss.trInfo.tr.SetError() 1187 } 1188 ss.trInfo.tr.Finish() 1189 ss.trInfo.tr = nil 1190 ss.mu.Unlock() 1191 }() 1192 } 1193 var appErr error 1194 var server interface{} 1195 if srv != nil { 1196 server = srv.server 1197 } 1198 if s.opts.streamInt == nil { 1199 appErr = sd.Handler(server, ss) 1200 } else { 1201 info := &StreamServerInfo{ 1202 FullMethod: stream.Method(), 1203 IsClientStream: sd.ClientStreams, 1204 IsServerStream: sd.ServerStreams, 1205 } 1206 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1207 } 1208 if appErr != nil { 1209 appStatus, ok := status.FromError(appErr) 1210 if !ok { 1211 appStatus = status.New(codes.Unknown, appErr.Error()) 1212 appErr = appStatus.Err() 1213 } 1214 if trInfo != nil { 1215 ss.mu.Lock() 1216 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1217 ss.trInfo.tr.SetError() 1218 ss.mu.Unlock() 1219 } 1220 t.WriteStatus(ss.s, appStatus) 1221 if ss.binlog != nil { 1222 ss.binlog.Log(&binarylog.ServerTrailer{ 1223 Trailer: ss.s.Trailer(), 1224 Err: appErr, 1225 }) 1226 } 1227 // TODO: Should we log an error from WriteStatus here and below? 1228 return appErr 1229 } 1230 if trInfo != nil { 1231 ss.mu.Lock() 1232 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1233 ss.mu.Unlock() 1234 } 1235 err = t.WriteStatus(ss.s, statusOK) 1236 if ss.binlog != nil { 1237 ss.binlog.Log(&binarylog.ServerTrailer{ 1238 Trailer: ss.s.Trailer(), 1239 Err: appErr, 1240 }) 1241 } 1242 return err 1243} 1244 1245func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1246 sm := stream.Method() 1247 if sm != "" && sm[0] == '/' { 1248 sm = sm[1:] 1249 } 1250 pos := strings.LastIndex(sm, "/") 1251 if pos == -1 { 1252 if trInfo != nil { 1253 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1254 trInfo.tr.SetError() 1255 } 1256 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1257 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1258 if trInfo != nil { 1259 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1260 trInfo.tr.SetError() 1261 } 1262 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1263 } 1264 if trInfo != nil { 1265 trInfo.tr.Finish() 1266 } 1267 return 1268 } 1269 service := sm[:pos] 1270 method := sm[pos+1:] 1271 1272 srv, knownService := s.m[service] 1273 if knownService { 1274 if md, ok := srv.md[method]; ok { 1275 s.processUnaryRPC(t, stream, srv, md, trInfo) 1276 return 1277 } 1278 if sd, ok := srv.sd[method]; ok { 1279 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1280 return 1281 } 1282 } 1283 // Unknown service, or known server unknown method. 1284 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1285 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1286 return 1287 } 1288 var errDesc string 1289 if !knownService { 1290 errDesc = fmt.Sprintf("unknown service %v", service) 1291 } else { 1292 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1293 } 1294 if trInfo != nil { 1295 trInfo.tr.LazyPrintf("%s", errDesc) 1296 trInfo.tr.SetError() 1297 } 1298 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1299 if trInfo != nil { 1300 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1301 trInfo.tr.SetError() 1302 } 1303 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1304 } 1305 if trInfo != nil { 1306 trInfo.tr.Finish() 1307 } 1308} 1309 1310// The key to save ServerTransportStream in the context. 1311type streamKey struct{} 1312 1313// NewContextWithServerTransportStream creates a new context from ctx and 1314// attaches stream to it. 1315// 1316// This API is EXPERIMENTAL. 1317func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1318 return context.WithValue(ctx, streamKey{}, stream) 1319} 1320 1321// ServerTransportStream is a minimal interface that a transport stream must 1322// implement. This can be used to mock an actual transport stream for tests of 1323// handler code that use, for example, grpc.SetHeader (which requires some 1324// stream to be in context). 1325// 1326// See also NewContextWithServerTransportStream. 1327// 1328// This API is EXPERIMENTAL. 1329type ServerTransportStream interface { 1330 Method() string 1331 SetHeader(md metadata.MD) error 1332 SendHeader(md metadata.MD) error 1333 SetTrailer(md metadata.MD) error 1334} 1335 1336// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1337// ctx. Returns nil if the given context has no stream associated with it 1338// (which implies it is not an RPC invocation context). 1339// 1340// This API is EXPERIMENTAL. 1341func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1342 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1343 return s 1344} 1345 1346// Stop stops the gRPC server. It immediately closes all open 1347// connections and listeners. 1348// It cancels all active RPCs on the server side and the corresponding 1349// pending RPCs on the client side will get notified by connection 1350// errors. 1351func (s *Server) Stop() { 1352 s.quit.Fire() 1353 1354 defer func() { 1355 s.serveWG.Wait() 1356 s.done.Fire() 1357 }() 1358 1359 s.channelzRemoveOnce.Do(func() { 1360 if channelz.IsOn() { 1361 channelz.RemoveEntry(s.channelzID) 1362 } 1363 }) 1364 1365 s.mu.Lock() 1366 listeners := s.lis 1367 s.lis = nil 1368 st := s.conns 1369 s.conns = nil 1370 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1371 s.cv.Broadcast() 1372 s.mu.Unlock() 1373 1374 for lis := range listeners { 1375 lis.Close() 1376 } 1377 for c := range st { 1378 c.Close() 1379 } 1380 1381 s.mu.Lock() 1382 if s.events != nil { 1383 s.events.Finish() 1384 s.events = nil 1385 } 1386 s.mu.Unlock() 1387} 1388 1389// GracefulStop stops the gRPC server gracefully. It stops the server from 1390// accepting new connections and RPCs and blocks until all the pending RPCs are 1391// finished. 1392func (s *Server) GracefulStop() { 1393 s.quit.Fire() 1394 defer s.done.Fire() 1395 1396 s.channelzRemoveOnce.Do(func() { 1397 if channelz.IsOn() { 1398 channelz.RemoveEntry(s.channelzID) 1399 } 1400 }) 1401 s.mu.Lock() 1402 if s.conns == nil { 1403 s.mu.Unlock() 1404 return 1405 } 1406 1407 for lis := range s.lis { 1408 lis.Close() 1409 } 1410 s.lis = nil 1411 if !s.drain { 1412 for st := range s.conns { 1413 st.Drain() 1414 } 1415 s.drain = true 1416 } 1417 1418 // Wait for serving threads to be ready to exit. Only then can we be sure no 1419 // new conns will be created. 1420 s.mu.Unlock() 1421 s.serveWG.Wait() 1422 s.mu.Lock() 1423 1424 for len(s.conns) != 0 { 1425 s.cv.Wait() 1426 } 1427 s.conns = nil 1428 if s.events != nil { 1429 s.events.Finish() 1430 s.events = nil 1431 } 1432 s.mu.Unlock() 1433} 1434 1435// contentSubtype must be lowercase 1436// cannot return nil 1437func (s *Server) getCodec(contentSubtype string) baseCodec { 1438 if s.opts.codec != nil { 1439 return s.opts.codec 1440 } 1441 if contentSubtype == "" { 1442 return encoding.GetCodec(proto.Name) 1443 } 1444 codec := encoding.GetCodec(contentSubtype) 1445 if codec == nil { 1446 return encoding.GetCodec(proto.Name) 1447 } 1448 return codec 1449} 1450 1451// SetHeader sets the header metadata. 1452// When called multiple times, all the provided metadata will be merged. 1453// All the metadata will be sent out when one of the following happens: 1454// - grpc.SendHeader() is called; 1455// - The first response is sent out; 1456// - An RPC status is sent out (error or success). 1457func SetHeader(ctx context.Context, md metadata.MD) error { 1458 if md.Len() == 0 { 1459 return nil 1460 } 1461 stream := ServerTransportStreamFromContext(ctx) 1462 if stream == nil { 1463 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1464 } 1465 return stream.SetHeader(md) 1466} 1467 1468// SendHeader sends header metadata. It may be called at most once. 1469// The provided md and headers set by SetHeader() will be sent. 1470func SendHeader(ctx context.Context, md metadata.MD) error { 1471 stream := ServerTransportStreamFromContext(ctx) 1472 if stream == nil { 1473 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1474 } 1475 if err := stream.SendHeader(md); err != nil { 1476 return toRPCErr(err) 1477 } 1478 return nil 1479} 1480 1481// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1482// When called more than once, all the provided metadata will be merged. 1483func SetTrailer(ctx context.Context, md metadata.MD) error { 1484 if md.Len() == 0 { 1485 return nil 1486 } 1487 stream := ServerTransportStreamFromContext(ctx) 1488 if stream == nil { 1489 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1490 } 1491 return stream.SetTrailer(md) 1492} 1493 1494// Method returns the method string for the server context. The returned 1495// string is in the format of "/service/method". 1496func Method(ctx context.Context) (string, bool) { 1497 s := ServerTransportStreamFromContext(ctx) 1498 if s == nil { 1499 return "", false 1500 } 1501 return s.Method(), true 1502} 1503 1504type channelzServer struct { 1505 s *Server 1506} 1507 1508func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1509 return c.s.channelzMetric() 1510} 1511