1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal/binarylog" 44 "google.golang.org/grpc/internal/channelz" 45 "google.golang.org/grpc/internal/transport" 46 "google.golang.org/grpc/keepalive" 47 "google.golang.org/grpc/metadata" 48 "google.golang.org/grpc/peer" 49 "google.golang.org/grpc/stats" 50 "google.golang.org/grpc/status" 51 "google.golang.org/grpc/tap" 52) 53 54const ( 55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 56 defaultServerMaxSendMessageSize = math.MaxInt32 57) 58 59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 60 61// MethodDesc represents an RPC service's method specification. 62type MethodDesc struct { 63 MethodName string 64 Handler methodHandler 65} 66 67// ServiceDesc represents an RPC service's specification. 68type ServiceDesc struct { 69 ServiceName string 70 // The pointer to the service interface. Used to check whether the user 71 // provided implementation satisfies the interface requirements. 72 HandlerType interface{} 73 Methods []MethodDesc 74 Streams []StreamDesc 75 Metadata interface{} 76} 77 78// service consists of the information of the server serving this service and 79// the methods in this service. 80type service struct { 81 server interface{} // the server for service methods 82 md map[string]*MethodDesc 83 sd map[string]*StreamDesc 84 mdata interface{} 85} 86 87// Server is a gRPC server to serve RPC requests. 88type Server struct { 89 opts serverOptions 90 91 mu sync.Mutex // guards following 92 lis map[net.Listener]bool 93 conns map[transport.ServerTransport]bool 94 serve bool 95 drain bool 96 cv *sync.Cond // signaled when connections close for GracefulStop 97 m map[string]*service // service name -> service info 98 events trace.EventLog 99 100 quit chan struct{} 101 done chan struct{} 102 quitOnce sync.Once 103 doneOnce sync.Once 104 channelzRemoveOnce sync.Once 105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 106 107 channelzID int64 // channelz unique identification number 108 czData *channelzData 109} 110 111type serverOptions struct { 112 creds credentials.TransportCredentials 113 codec baseCodec 114 cp Compressor 115 dc Decompressor 116 unaryInt UnaryServerInterceptor 117 streamInt StreamServerInterceptor 118 inTapHandle tap.ServerInHandle 119 statsHandler stats.Handler 120 maxConcurrentStreams uint32 121 maxReceiveMessageSize int 122 maxSendMessageSize int 123 unknownStreamDesc *StreamDesc 124 keepaliveParams keepalive.ServerParameters 125 keepalivePolicy keepalive.EnforcementPolicy 126 initialWindowSize int32 127 initialConnWindowSize int32 128 writeBufferSize int 129 readBufferSize int 130 connectionTimeout time.Duration 131 maxHeaderListSize *uint32 132} 133 134var defaultServerOptions = serverOptions{ 135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 136 maxSendMessageSize: defaultServerMaxSendMessageSize, 137 connectionTimeout: 120 * time.Second, 138 writeBufferSize: defaultWriteBufSize, 139 readBufferSize: defaultReadBufSize, 140} 141 142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 143type ServerOption interface { 144 apply(*serverOptions) 145} 146 147// EmptyServerOption does not alter the server configuration. It can be embedded 148// in another structure to build custom server options. 149// 150// This API is EXPERIMENTAL. 151type EmptyServerOption struct{} 152 153func (EmptyServerOption) apply(*serverOptions) {} 154 155// funcServerOption wraps a function that modifies serverOptions into an 156// implementation of the ServerOption interface. 157type funcServerOption struct { 158 f func(*serverOptions) 159} 160 161func (fdo *funcServerOption) apply(do *serverOptions) { 162 fdo.f(do) 163} 164 165func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 166 return &funcServerOption{ 167 f: f, 168 } 169} 170 171// WriteBufferSize determines how much data can be batched before doing a write on the wire. 172// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 173// The default value for this buffer is 32KB. 174// Zero will disable the write buffer such that each write will be on underlying connection. 175// Note: A Send call may not directly translate to a write. 176func WriteBufferSize(s int) ServerOption { 177 return newFuncServerOption(func(o *serverOptions) { 178 o.writeBufferSize = s 179 }) 180} 181 182// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 183// for one read syscall. 184// The default value for this buffer is 32KB. 185// Zero will disable read buffer for a connection so data framer can access the underlying 186// conn directly. 187func ReadBufferSize(s int) ServerOption { 188 return newFuncServerOption(func(o *serverOptions) { 189 o.readBufferSize = s 190 }) 191} 192 193// InitialWindowSize returns a ServerOption that sets window size for stream. 194// The lower bound for window size is 64K and any value smaller than that will be ignored. 195func InitialWindowSize(s int32) ServerOption { 196 return newFuncServerOption(func(o *serverOptions) { 197 o.initialWindowSize = s 198 }) 199} 200 201// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 202// The lower bound for window size is 64K and any value smaller than that will be ignored. 203func InitialConnWindowSize(s int32) ServerOption { 204 return newFuncServerOption(func(o *serverOptions) { 205 o.initialConnWindowSize = s 206 }) 207} 208 209// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 210func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 211 if kp.Time > 0 && kp.Time < time.Second { 212 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") 213 kp.Time = time.Second 214 } 215 216 return newFuncServerOption(func(o *serverOptions) { 217 o.keepaliveParams = kp 218 }) 219} 220 221// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 222func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 223 return newFuncServerOption(func(o *serverOptions) { 224 o.keepalivePolicy = kep 225 }) 226} 227 228// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 229// 230// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 231func CustomCodec(codec Codec) ServerOption { 232 return newFuncServerOption(func(o *serverOptions) { 233 o.codec = codec 234 }) 235} 236 237// RPCCompressor returns a ServerOption that sets a compressor for outbound 238// messages. For backward compatibility, all outbound messages will be sent 239// using this compressor, regardless of incoming message compression. By 240// default, server messages will be sent using the same compressor with which 241// request messages were sent. 242// 243// Deprecated: use encoding.RegisterCompressor instead. 244func RPCCompressor(cp Compressor) ServerOption { 245 return newFuncServerOption(func(o *serverOptions) { 246 o.cp = cp 247 }) 248} 249 250// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 251// messages. It has higher priority than decompressors registered via 252// encoding.RegisterCompressor. 253// 254// Deprecated: use encoding.RegisterCompressor instead. 255func RPCDecompressor(dc Decompressor) ServerOption { 256 return newFuncServerOption(func(o *serverOptions) { 257 o.dc = dc 258 }) 259} 260 261// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 262// If this is not set, gRPC uses the default limit. 263// 264// Deprecated: use MaxRecvMsgSize instead. 265func MaxMsgSize(m int) ServerOption { 266 return MaxRecvMsgSize(m) 267} 268 269// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 270// If this is not set, gRPC uses the default 4MB. 271func MaxRecvMsgSize(m int) ServerOption { 272 return newFuncServerOption(func(o *serverOptions) { 273 o.maxReceiveMessageSize = m 274 }) 275} 276 277// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 278// If this is not set, gRPC uses the default `math.MaxInt32`. 279func MaxSendMsgSize(m int) ServerOption { 280 return newFuncServerOption(func(o *serverOptions) { 281 o.maxSendMessageSize = m 282 }) 283} 284 285// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 286// of concurrent streams to each ServerTransport. 287func MaxConcurrentStreams(n uint32) ServerOption { 288 return newFuncServerOption(func(o *serverOptions) { 289 o.maxConcurrentStreams = n 290 }) 291} 292 293// Creds returns a ServerOption that sets credentials for server connections. 294func Creds(c credentials.TransportCredentials) ServerOption { 295 return newFuncServerOption(func(o *serverOptions) { 296 o.creds = c 297 }) 298} 299 300// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 301// server. Only one unary interceptor can be installed. The construction of multiple 302// interceptors (e.g., chaining) can be implemented at the caller. 303func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 304 return newFuncServerOption(func(o *serverOptions) { 305 if o.unaryInt != nil { 306 panic("The unary server interceptor was already set and may not be reset.") 307 } 308 o.unaryInt = i 309 }) 310} 311 312// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 313// server. Only one stream interceptor can be installed. 314func StreamInterceptor(i StreamServerInterceptor) ServerOption { 315 return newFuncServerOption(func(o *serverOptions) { 316 if o.streamInt != nil { 317 panic("The stream server interceptor was already set and may not be reset.") 318 } 319 o.streamInt = i 320 }) 321} 322 323// InTapHandle returns a ServerOption that sets the tap handle for all the server 324// transport to be created. Only one can be installed. 325func InTapHandle(h tap.ServerInHandle) ServerOption { 326 return newFuncServerOption(func(o *serverOptions) { 327 if o.inTapHandle != nil { 328 panic("The tap handle was already set and may not be reset.") 329 } 330 o.inTapHandle = h 331 }) 332} 333 334// StatsHandler returns a ServerOption that sets the stats handler for the server. 335func StatsHandler(h stats.Handler) ServerOption { 336 return newFuncServerOption(func(o *serverOptions) { 337 o.statsHandler = h 338 }) 339} 340 341// UnknownServiceHandler returns a ServerOption that allows for adding a custom 342// unknown service handler. The provided method is a bidi-streaming RPC service 343// handler that will be invoked instead of returning the "unimplemented" gRPC 344// error whenever a request is received for an unregistered service or method. 345// The handling function has full access to the Context of the request and the 346// stream, and the invocation bypasses interceptors. 347func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 348 return newFuncServerOption(func(o *serverOptions) { 349 o.unknownStreamDesc = &StreamDesc{ 350 StreamName: "unknown_service_handler", 351 Handler: streamHandler, 352 // We need to assume that the users of the streamHandler will want to use both. 353 ClientStreams: true, 354 ServerStreams: true, 355 } 356 }) 357} 358 359// ConnectionTimeout returns a ServerOption that sets the timeout for 360// connection establishment (up to and including HTTP/2 handshaking) for all 361// new connections. If this is not set, the default is 120 seconds. A zero or 362// negative value will result in an immediate timeout. 363// 364// This API is EXPERIMENTAL. 365func ConnectionTimeout(d time.Duration) ServerOption { 366 return newFuncServerOption(func(o *serverOptions) { 367 o.connectionTimeout = d 368 }) 369} 370 371// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 372// of header list that the server is prepared to accept. 373func MaxHeaderListSize(s uint32) ServerOption { 374 return newFuncServerOption(func(o *serverOptions) { 375 o.maxHeaderListSize = &s 376 }) 377} 378 379// NewServer creates a gRPC server which has no service registered and has not 380// started to accept requests yet. 381func NewServer(opt ...ServerOption) *Server { 382 opts := defaultServerOptions 383 for _, o := range opt { 384 o.apply(&opts) 385 } 386 s := &Server{ 387 lis: make(map[net.Listener]bool), 388 opts: opts, 389 conns: make(map[transport.ServerTransport]bool), 390 m: make(map[string]*service), 391 quit: make(chan struct{}), 392 done: make(chan struct{}), 393 czData: new(channelzData), 394 } 395 s.cv = sync.NewCond(&s.mu) 396 if EnableTracing { 397 _, file, line, _ := runtime.Caller(1) 398 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 399 } 400 401 if channelz.IsOn() { 402 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 403 } 404 return s 405} 406 407// printf records an event in s's event log, unless s has been stopped. 408// REQUIRES s.mu is held. 409func (s *Server) printf(format string, a ...interface{}) { 410 if s.events != nil { 411 s.events.Printf(format, a...) 412 } 413} 414 415// errorf records an error in s's event log, unless s has been stopped. 416// REQUIRES s.mu is held. 417func (s *Server) errorf(format string, a ...interface{}) { 418 if s.events != nil { 419 s.events.Errorf(format, a...) 420 } 421} 422 423// RegisterService registers a service and its implementation to the gRPC 424// server. It is called from the IDL generated code. This must be called before 425// invoking Serve. 426func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 427 ht := reflect.TypeOf(sd.HandlerType).Elem() 428 st := reflect.TypeOf(ss) 429 if !st.Implements(ht) { 430 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 431 } 432 s.register(sd, ss) 433} 434 435func (s *Server) register(sd *ServiceDesc, ss interface{}) { 436 s.mu.Lock() 437 defer s.mu.Unlock() 438 s.printf("RegisterService(%q)", sd.ServiceName) 439 if s.serve { 440 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 441 } 442 if _, ok := s.m[sd.ServiceName]; ok { 443 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 444 } 445 srv := &service{ 446 server: ss, 447 md: make(map[string]*MethodDesc), 448 sd: make(map[string]*StreamDesc), 449 mdata: sd.Metadata, 450 } 451 for i := range sd.Methods { 452 d := &sd.Methods[i] 453 srv.md[d.MethodName] = d 454 } 455 for i := range sd.Streams { 456 d := &sd.Streams[i] 457 srv.sd[d.StreamName] = d 458 } 459 s.m[sd.ServiceName] = srv 460} 461 462// MethodInfo contains the information of an RPC including its method name and type. 463type MethodInfo struct { 464 // Name is the method name only, without the service name or package name. 465 Name string 466 // IsClientStream indicates whether the RPC is a client streaming RPC. 467 IsClientStream bool 468 // IsServerStream indicates whether the RPC is a server streaming RPC. 469 IsServerStream bool 470} 471 472// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 473type ServiceInfo struct { 474 Methods []MethodInfo 475 // Metadata is the metadata specified in ServiceDesc when registering service. 476 Metadata interface{} 477} 478 479// GetServiceInfo returns a map from service names to ServiceInfo. 480// Service names include the package names, in the form of <package>.<service>. 481func (s *Server) GetServiceInfo() map[string]ServiceInfo { 482 ret := make(map[string]ServiceInfo) 483 for n, srv := range s.m { 484 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 485 for m := range srv.md { 486 methods = append(methods, MethodInfo{ 487 Name: m, 488 IsClientStream: false, 489 IsServerStream: false, 490 }) 491 } 492 for m, d := range srv.sd { 493 methods = append(methods, MethodInfo{ 494 Name: m, 495 IsClientStream: d.ClientStreams, 496 IsServerStream: d.ServerStreams, 497 }) 498 } 499 500 ret[n] = ServiceInfo{ 501 Methods: methods, 502 Metadata: srv.mdata, 503 } 504 } 505 return ret 506} 507 508// ErrServerStopped indicates that the operation is now illegal because of 509// the server being stopped. 510var ErrServerStopped = errors.New("grpc: the server has been stopped") 511 512func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 513 if s.opts.creds == nil { 514 return rawConn, nil, nil 515 } 516 return s.opts.creds.ServerHandshake(rawConn) 517} 518 519type listenSocket struct { 520 net.Listener 521 channelzID int64 522} 523 524func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 525 return &channelz.SocketInternalMetric{ 526 SocketOptions: channelz.GetSocketOption(l.Listener), 527 LocalAddr: l.Listener.Addr(), 528 } 529} 530 531func (l *listenSocket) Close() error { 532 err := l.Listener.Close() 533 if channelz.IsOn() { 534 channelz.RemoveEntry(l.channelzID) 535 } 536 return err 537} 538 539// Serve accepts incoming connections on the listener lis, creating a new 540// ServerTransport and service goroutine for each. The service goroutines 541// read gRPC requests and then call the registered handlers to reply to them. 542// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 543// this method returns. 544// Serve will return a non-nil error unless Stop or GracefulStop is called. 545func (s *Server) Serve(lis net.Listener) error { 546 s.mu.Lock() 547 s.printf("serving") 548 s.serve = true 549 if s.lis == nil { 550 // Serve called after Stop or GracefulStop. 551 s.mu.Unlock() 552 lis.Close() 553 return ErrServerStopped 554 } 555 556 s.serveWG.Add(1) 557 defer func() { 558 s.serveWG.Done() 559 select { 560 // Stop or GracefulStop called; block until done and return nil. 561 case <-s.quit: 562 <-s.done 563 default: 564 } 565 }() 566 567 ls := &listenSocket{Listener: lis} 568 s.lis[ls] = true 569 570 if channelz.IsOn() { 571 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 572 } 573 s.mu.Unlock() 574 575 defer func() { 576 s.mu.Lock() 577 if s.lis != nil && s.lis[ls] { 578 ls.Close() 579 delete(s.lis, ls) 580 } 581 s.mu.Unlock() 582 }() 583 584 var tempDelay time.Duration // how long to sleep on accept failure 585 586 for { 587 rawConn, err := lis.Accept() 588 if err != nil { 589 if ne, ok := err.(interface { 590 Temporary() bool 591 }); ok && ne.Temporary() { 592 if tempDelay == 0 { 593 tempDelay = 5 * time.Millisecond 594 } else { 595 tempDelay *= 2 596 } 597 if max := 1 * time.Second; tempDelay > max { 598 tempDelay = max 599 } 600 s.mu.Lock() 601 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 602 s.mu.Unlock() 603 timer := time.NewTimer(tempDelay) 604 select { 605 case <-timer.C: 606 case <-s.quit: 607 timer.Stop() 608 return nil 609 } 610 continue 611 } 612 s.mu.Lock() 613 s.printf("done serving; Accept = %v", err) 614 s.mu.Unlock() 615 616 select { 617 case <-s.quit: 618 return nil 619 default: 620 } 621 return err 622 } 623 tempDelay = 0 624 // Start a new goroutine to deal with rawConn so we don't stall this Accept 625 // loop goroutine. 626 // 627 // Make sure we account for the goroutine so GracefulStop doesn't nil out 628 // s.conns before this conn can be added. 629 s.serveWG.Add(1) 630 go func() { 631 s.handleRawConn(rawConn) 632 s.serveWG.Done() 633 }() 634 } 635} 636 637// handleRawConn forks a goroutine to handle a just-accepted connection that 638// has not had any I/O performed on it yet. 639func (s *Server) handleRawConn(rawConn net.Conn) { 640 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 641 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 642 if err != nil { 643 // ErrConnDispatched means that the connection was dispatched away from 644 // gRPC; those connections should be left open. 645 if err != credentials.ErrConnDispatched { 646 s.mu.Lock() 647 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 648 s.mu.Unlock() 649 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 650 rawConn.Close() 651 } 652 rawConn.SetDeadline(time.Time{}) 653 return 654 } 655 656 s.mu.Lock() 657 if s.conns == nil { 658 s.mu.Unlock() 659 conn.Close() 660 return 661 } 662 s.mu.Unlock() 663 664 // Finish handshaking (HTTP2) 665 st := s.newHTTP2Transport(conn, authInfo) 666 if st == nil { 667 return 668 } 669 670 rawConn.SetDeadline(time.Time{}) 671 if !s.addConn(st) { 672 return 673 } 674 go func() { 675 s.serveStreams(st) 676 s.removeConn(st) 677 }() 678} 679 680// newHTTP2Transport sets up a http/2 transport (using the 681// gRPC http2 server transport in transport/http2_server.go). 682func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 683 config := &transport.ServerConfig{ 684 MaxStreams: s.opts.maxConcurrentStreams, 685 AuthInfo: authInfo, 686 InTapHandle: s.opts.inTapHandle, 687 StatsHandler: s.opts.statsHandler, 688 KeepaliveParams: s.opts.keepaliveParams, 689 KeepalivePolicy: s.opts.keepalivePolicy, 690 InitialWindowSize: s.opts.initialWindowSize, 691 InitialConnWindowSize: s.opts.initialConnWindowSize, 692 WriteBufferSize: s.opts.writeBufferSize, 693 ReadBufferSize: s.opts.readBufferSize, 694 ChannelzParentID: s.channelzID, 695 MaxHeaderListSize: s.opts.maxHeaderListSize, 696 } 697 st, err := transport.NewServerTransport("http2", c, config) 698 if err != nil { 699 s.mu.Lock() 700 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 701 s.mu.Unlock() 702 c.Close() 703 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) 704 return nil 705 } 706 707 return st 708} 709 710func (s *Server) serveStreams(st transport.ServerTransport) { 711 defer st.Close() 712 var wg sync.WaitGroup 713 st.HandleStreams(func(stream *transport.Stream) { 714 wg.Add(1) 715 go func() { 716 defer wg.Done() 717 s.handleStream(st, stream, s.traceInfo(st, stream)) 718 }() 719 }, func(ctx context.Context, method string) context.Context { 720 if !EnableTracing { 721 return ctx 722 } 723 tr := trace.New("grpc.Recv."+methodFamily(method), method) 724 return trace.NewContext(ctx, tr) 725 }) 726 wg.Wait() 727} 728 729var _ http.Handler = (*Server)(nil) 730 731// ServeHTTP implements the Go standard library's http.Handler 732// interface by responding to the gRPC request r, by looking up 733// the requested gRPC method in the gRPC server s. 734// 735// The provided HTTP request must have arrived on an HTTP/2 736// connection. When using the Go standard library's server, 737// practically this means that the Request must also have arrived 738// over TLS. 739// 740// To share one port (such as 443 for https) between gRPC and an 741// existing http.Handler, use a root http.Handler such as: 742// 743// if r.ProtoMajor == 2 && strings.HasPrefix( 744// r.Header.Get("Content-Type"), "application/grpc") { 745// grpcServer.ServeHTTP(w, r) 746// } else { 747// yourMux.ServeHTTP(w, r) 748// } 749// 750// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 751// separate from grpc-go's HTTP/2 server. Performance and features may vary 752// between the two paths. ServeHTTP does not support some gRPC features 753// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 754// and subject to change. 755func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 756 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 757 if err != nil { 758 http.Error(w, err.Error(), http.StatusInternalServerError) 759 return 760 } 761 if !s.addConn(st) { 762 return 763 } 764 defer s.removeConn(st) 765 s.serveStreams(st) 766} 767 768// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 769// If tracing is not enabled, it returns nil. 770func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 771 tr, ok := trace.FromContext(stream.Context()) 772 if !ok { 773 return nil 774 } 775 776 trInfo = &traceInfo{ 777 tr: tr, 778 firstLine: firstLine{ 779 client: false, 780 remoteAddr: st.RemoteAddr(), 781 }, 782 } 783 if dl, ok := stream.Context().Deadline(); ok { 784 trInfo.firstLine.deadline = time.Until(dl) 785 } 786 return trInfo 787} 788 789func (s *Server) addConn(st transport.ServerTransport) bool { 790 s.mu.Lock() 791 defer s.mu.Unlock() 792 if s.conns == nil { 793 st.Close() 794 return false 795 } 796 if s.drain { 797 // Transport added after we drained our existing conns: drain it 798 // immediately. 799 st.Drain() 800 } 801 s.conns[st] = true 802 return true 803} 804 805func (s *Server) removeConn(st transport.ServerTransport) { 806 s.mu.Lock() 807 defer s.mu.Unlock() 808 if s.conns != nil { 809 delete(s.conns, st) 810 s.cv.Broadcast() 811 } 812} 813 814func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 815 return &channelz.ServerInternalMetric{ 816 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 817 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 818 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 819 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 820 } 821} 822 823func (s *Server) incrCallsStarted() { 824 atomic.AddInt64(&s.czData.callsStarted, 1) 825 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 826} 827 828func (s *Server) incrCallsSucceeded() { 829 atomic.AddInt64(&s.czData.callsSucceeded, 1) 830} 831 832func (s *Server) incrCallsFailed() { 833 atomic.AddInt64(&s.czData.callsFailed, 1) 834} 835 836func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 837 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 838 if err != nil { 839 grpclog.Errorln("grpc: server failed to encode response: ", err) 840 return err 841 } 842 compData, err := compress(data, cp, comp) 843 if err != nil { 844 grpclog.Errorln("grpc: server failed to compress response: ", err) 845 return err 846 } 847 hdr, payload := msgHeader(data, compData) 848 // TODO(dfawley): should we be checking len(data) instead? 849 if len(payload) > s.opts.maxSendMessageSize { 850 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 851 } 852 err = t.Write(stream, hdr, payload, opts) 853 if err == nil && s.opts.statsHandler != nil { 854 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 855 } 856 return err 857} 858 859func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 860 if channelz.IsOn() { 861 s.incrCallsStarted() 862 defer func() { 863 if err != nil && err != io.EOF { 864 s.incrCallsFailed() 865 } else { 866 s.incrCallsSucceeded() 867 } 868 }() 869 } 870 sh := s.opts.statsHandler 871 if sh != nil { 872 beginTime := time.Now() 873 begin := &stats.Begin{ 874 BeginTime: beginTime, 875 } 876 sh.HandleRPC(stream.Context(), begin) 877 defer func() { 878 end := &stats.End{ 879 BeginTime: beginTime, 880 EndTime: time.Now(), 881 } 882 if err != nil && err != io.EOF { 883 end.Error = toRPCErr(err) 884 } 885 sh.HandleRPC(stream.Context(), end) 886 }() 887 } 888 if trInfo != nil { 889 defer trInfo.tr.Finish() 890 trInfo.tr.LazyLog(&trInfo.firstLine, false) 891 defer func() { 892 if err != nil && err != io.EOF { 893 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 894 trInfo.tr.SetError() 895 } 896 }() 897 } 898 899 binlog := binarylog.GetMethodLogger(stream.Method()) 900 if binlog != nil { 901 ctx := stream.Context() 902 md, _ := metadata.FromIncomingContext(ctx) 903 logEntry := &binarylog.ClientHeader{ 904 Header: md, 905 MethodName: stream.Method(), 906 PeerAddr: nil, 907 } 908 if deadline, ok := ctx.Deadline(); ok { 909 logEntry.Timeout = time.Until(deadline) 910 if logEntry.Timeout < 0 { 911 logEntry.Timeout = 0 912 } 913 } 914 if a := md[":authority"]; len(a) > 0 { 915 logEntry.Authority = a[0] 916 } 917 if peer, ok := peer.FromContext(ctx); ok { 918 logEntry.PeerAddr = peer.Addr 919 } 920 binlog.Log(logEntry) 921 } 922 923 // comp and cp are used for compression. decomp and dc are used for 924 // decompression. If comp and decomp are both set, they are the same; 925 // however they are kept separate to ensure that at most one of the 926 // compressor/decompressor variable pairs are set for use later. 927 var comp, decomp encoding.Compressor 928 var cp Compressor 929 var dc Decompressor 930 931 // If dc is set and matches the stream's compression, use it. Otherwise, try 932 // to find a matching registered compressor for decomp. 933 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 934 dc = s.opts.dc 935 } else if rc != "" && rc != encoding.Identity { 936 decomp = encoding.GetCompressor(rc) 937 if decomp == nil { 938 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 939 t.WriteStatus(stream, st) 940 return st.Err() 941 } 942 } 943 944 // If cp is set, use it. Otherwise, attempt to compress the response using 945 // the incoming message compression method. 946 // 947 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 948 if s.opts.cp != nil { 949 cp = s.opts.cp 950 stream.SetSendCompress(cp.Type()) 951 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 952 // Legacy compressor not specified; attempt to respond with same encoding. 953 comp = encoding.GetCompressor(rc) 954 if comp != nil { 955 stream.SetSendCompress(rc) 956 } 957 } 958 959 var payInfo *payloadInfo 960 if sh != nil || binlog != nil { 961 payInfo = &payloadInfo{} 962 } 963 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 964 if err != nil { 965 if st, ok := status.FromError(err); ok { 966 if e := t.WriteStatus(stream, st); e != nil { 967 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 968 } 969 } 970 return err 971 } 972 if channelz.IsOn() { 973 t.IncrMsgRecv() 974 } 975 df := func(v interface{}) error { 976 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 977 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 978 } 979 if sh != nil { 980 sh.HandleRPC(stream.Context(), &stats.InPayload{ 981 RecvTime: time.Now(), 982 Payload: v, 983 Data: d, 984 Length: len(d), 985 }) 986 } 987 if binlog != nil { 988 binlog.Log(&binarylog.ClientMessage{ 989 Message: d, 990 }) 991 } 992 if trInfo != nil { 993 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 994 } 995 return nil 996 } 997 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 998 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 999 if appErr != nil { 1000 appStatus, ok := status.FromError(appErr) 1001 if !ok { 1002 // Convert appErr if it is not a grpc status error. 1003 appErr = status.Error(codes.Unknown, appErr.Error()) 1004 appStatus, _ = status.FromError(appErr) 1005 } 1006 if trInfo != nil { 1007 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1008 trInfo.tr.SetError() 1009 } 1010 if e := t.WriteStatus(stream, appStatus); e != nil { 1011 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1012 } 1013 if binlog != nil { 1014 if h, _ := stream.Header(); h.Len() > 0 { 1015 // Only log serverHeader if there was header. Otherwise it can 1016 // be trailer only. 1017 binlog.Log(&binarylog.ServerHeader{ 1018 Header: h, 1019 }) 1020 } 1021 binlog.Log(&binarylog.ServerTrailer{ 1022 Trailer: stream.Trailer(), 1023 Err: appErr, 1024 }) 1025 } 1026 return appErr 1027 } 1028 if trInfo != nil { 1029 trInfo.tr.LazyLog(stringer("OK"), false) 1030 } 1031 opts := &transport.Options{Last: true} 1032 1033 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1034 if err == io.EOF { 1035 // The entire stream is done (for unary RPC only). 1036 return err 1037 } 1038 if s, ok := status.FromError(err); ok { 1039 if e := t.WriteStatus(stream, s); e != nil { 1040 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1041 } 1042 } else { 1043 switch st := err.(type) { 1044 case transport.ConnectionError: 1045 // Nothing to do here. 1046 default: 1047 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1048 } 1049 } 1050 if binlog != nil { 1051 h, _ := stream.Header() 1052 binlog.Log(&binarylog.ServerHeader{ 1053 Header: h, 1054 }) 1055 binlog.Log(&binarylog.ServerTrailer{ 1056 Trailer: stream.Trailer(), 1057 Err: appErr, 1058 }) 1059 } 1060 return err 1061 } 1062 if binlog != nil { 1063 h, _ := stream.Header() 1064 binlog.Log(&binarylog.ServerHeader{ 1065 Header: h, 1066 }) 1067 binlog.Log(&binarylog.ServerMessage{ 1068 Message: reply, 1069 }) 1070 } 1071 if channelz.IsOn() { 1072 t.IncrMsgSent() 1073 } 1074 if trInfo != nil { 1075 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1076 } 1077 // TODO: Should we be logging if writing status failed here, like above? 1078 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1079 // error or allow the stats handler to see it? 1080 err = t.WriteStatus(stream, status.New(codes.OK, "")) 1081 if binlog != nil { 1082 binlog.Log(&binarylog.ServerTrailer{ 1083 Trailer: stream.Trailer(), 1084 Err: appErr, 1085 }) 1086 } 1087 return err 1088} 1089 1090func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1091 if channelz.IsOn() { 1092 s.incrCallsStarted() 1093 defer func() { 1094 if err != nil && err != io.EOF { 1095 s.incrCallsFailed() 1096 } else { 1097 s.incrCallsSucceeded() 1098 } 1099 }() 1100 } 1101 sh := s.opts.statsHandler 1102 if sh != nil { 1103 beginTime := time.Now() 1104 begin := &stats.Begin{ 1105 BeginTime: beginTime, 1106 } 1107 sh.HandleRPC(stream.Context(), begin) 1108 defer func() { 1109 end := &stats.End{ 1110 BeginTime: beginTime, 1111 EndTime: time.Now(), 1112 } 1113 if err != nil && err != io.EOF { 1114 end.Error = toRPCErr(err) 1115 } 1116 sh.HandleRPC(stream.Context(), end) 1117 }() 1118 } 1119 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1120 ss := &serverStream{ 1121 ctx: ctx, 1122 t: t, 1123 s: stream, 1124 p: &parser{r: stream}, 1125 codec: s.getCodec(stream.ContentSubtype()), 1126 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1127 maxSendMessageSize: s.opts.maxSendMessageSize, 1128 trInfo: trInfo, 1129 statsHandler: sh, 1130 } 1131 1132 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1133 if ss.binlog != nil { 1134 md, _ := metadata.FromIncomingContext(ctx) 1135 logEntry := &binarylog.ClientHeader{ 1136 Header: md, 1137 MethodName: stream.Method(), 1138 PeerAddr: nil, 1139 } 1140 if deadline, ok := ctx.Deadline(); ok { 1141 logEntry.Timeout = time.Until(deadline) 1142 if logEntry.Timeout < 0 { 1143 logEntry.Timeout = 0 1144 } 1145 } 1146 if a := md[":authority"]; len(a) > 0 { 1147 logEntry.Authority = a[0] 1148 } 1149 if peer, ok := peer.FromContext(ss.Context()); ok { 1150 logEntry.PeerAddr = peer.Addr 1151 } 1152 ss.binlog.Log(logEntry) 1153 } 1154 1155 // If dc is set and matches the stream's compression, use it. Otherwise, try 1156 // to find a matching registered compressor for decomp. 1157 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1158 ss.dc = s.opts.dc 1159 } else if rc != "" && rc != encoding.Identity { 1160 ss.decomp = encoding.GetCompressor(rc) 1161 if ss.decomp == nil { 1162 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1163 t.WriteStatus(ss.s, st) 1164 return st.Err() 1165 } 1166 } 1167 1168 // If cp is set, use it. Otherwise, attempt to compress the response using 1169 // the incoming message compression method. 1170 // 1171 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1172 if s.opts.cp != nil { 1173 ss.cp = s.opts.cp 1174 stream.SetSendCompress(s.opts.cp.Type()) 1175 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1176 // Legacy compressor not specified; attempt to respond with same encoding. 1177 ss.comp = encoding.GetCompressor(rc) 1178 if ss.comp != nil { 1179 stream.SetSendCompress(rc) 1180 } 1181 } 1182 1183 if trInfo != nil { 1184 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1185 defer func() { 1186 ss.mu.Lock() 1187 if err != nil && err != io.EOF { 1188 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1189 ss.trInfo.tr.SetError() 1190 } 1191 ss.trInfo.tr.Finish() 1192 ss.trInfo.tr = nil 1193 ss.mu.Unlock() 1194 }() 1195 } 1196 var appErr error 1197 var server interface{} 1198 if srv != nil { 1199 server = srv.server 1200 } 1201 if s.opts.streamInt == nil { 1202 appErr = sd.Handler(server, ss) 1203 } else { 1204 info := &StreamServerInfo{ 1205 FullMethod: stream.Method(), 1206 IsClientStream: sd.ClientStreams, 1207 IsServerStream: sd.ServerStreams, 1208 } 1209 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1210 } 1211 if appErr != nil { 1212 appStatus, ok := status.FromError(appErr) 1213 if !ok { 1214 appStatus = status.New(codes.Unknown, appErr.Error()) 1215 appErr = appStatus.Err() 1216 } 1217 if trInfo != nil { 1218 ss.mu.Lock() 1219 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1220 ss.trInfo.tr.SetError() 1221 ss.mu.Unlock() 1222 } 1223 t.WriteStatus(ss.s, appStatus) 1224 if ss.binlog != nil { 1225 ss.binlog.Log(&binarylog.ServerTrailer{ 1226 Trailer: ss.s.Trailer(), 1227 Err: appErr, 1228 }) 1229 } 1230 // TODO: Should we log an error from WriteStatus here and below? 1231 return appErr 1232 } 1233 if trInfo != nil { 1234 ss.mu.Lock() 1235 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1236 ss.mu.Unlock() 1237 } 1238 err = t.WriteStatus(ss.s, status.New(codes.OK, "")) 1239 if ss.binlog != nil { 1240 ss.binlog.Log(&binarylog.ServerTrailer{ 1241 Trailer: ss.s.Trailer(), 1242 Err: appErr, 1243 }) 1244 } 1245 return err 1246} 1247 1248func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1249 sm := stream.Method() 1250 if sm != "" && sm[0] == '/' { 1251 sm = sm[1:] 1252 } 1253 pos := strings.LastIndex(sm, "/") 1254 if pos == -1 { 1255 if trInfo != nil { 1256 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1257 trInfo.tr.SetError() 1258 } 1259 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1260 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1261 if trInfo != nil { 1262 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1263 trInfo.tr.SetError() 1264 } 1265 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1266 } 1267 if trInfo != nil { 1268 trInfo.tr.Finish() 1269 } 1270 return 1271 } 1272 service := sm[:pos] 1273 method := sm[pos+1:] 1274 1275 srv, knownService := s.m[service] 1276 if knownService { 1277 if md, ok := srv.md[method]; ok { 1278 s.processUnaryRPC(t, stream, srv, md, trInfo) 1279 return 1280 } 1281 if sd, ok := srv.sd[method]; ok { 1282 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1283 return 1284 } 1285 } 1286 // Unknown service, or known server unknown method. 1287 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1288 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1289 return 1290 } 1291 var errDesc string 1292 if !knownService { 1293 errDesc = fmt.Sprintf("unknown service %v", service) 1294 } else { 1295 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1296 } 1297 if trInfo != nil { 1298 trInfo.tr.LazyPrintf("%s", errDesc) 1299 trInfo.tr.SetError() 1300 } 1301 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1302 if trInfo != nil { 1303 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1304 trInfo.tr.SetError() 1305 } 1306 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1307 } 1308 if trInfo != nil { 1309 trInfo.tr.Finish() 1310 } 1311} 1312 1313// The key to save ServerTransportStream in the context. 1314type streamKey struct{} 1315 1316// NewContextWithServerTransportStream creates a new context from ctx and 1317// attaches stream to it. 1318// 1319// This API is EXPERIMENTAL. 1320func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1321 return context.WithValue(ctx, streamKey{}, stream) 1322} 1323 1324// ServerTransportStream is a minimal interface that a transport stream must 1325// implement. This can be used to mock an actual transport stream for tests of 1326// handler code that use, for example, grpc.SetHeader (which requires some 1327// stream to be in context). 1328// 1329// See also NewContextWithServerTransportStream. 1330// 1331// This API is EXPERIMENTAL. 1332type ServerTransportStream interface { 1333 Method() string 1334 SetHeader(md metadata.MD) error 1335 SendHeader(md metadata.MD) error 1336 SetTrailer(md metadata.MD) error 1337} 1338 1339// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1340// ctx. Returns nil if the given context has no stream associated with it 1341// (which implies it is not an RPC invocation context). 1342// 1343// This API is EXPERIMENTAL. 1344func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1345 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1346 return s 1347} 1348 1349// Stop stops the gRPC server. It immediately closes all open 1350// connections and listeners. 1351// It cancels all active RPCs on the server side and the corresponding 1352// pending RPCs on the client side will get notified by connection 1353// errors. 1354func (s *Server) Stop() { 1355 s.quitOnce.Do(func() { 1356 close(s.quit) 1357 }) 1358 1359 defer func() { 1360 s.serveWG.Wait() 1361 s.doneOnce.Do(func() { 1362 close(s.done) 1363 }) 1364 }() 1365 1366 s.channelzRemoveOnce.Do(func() { 1367 if channelz.IsOn() { 1368 channelz.RemoveEntry(s.channelzID) 1369 } 1370 }) 1371 1372 s.mu.Lock() 1373 listeners := s.lis 1374 s.lis = nil 1375 st := s.conns 1376 s.conns = nil 1377 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1378 s.cv.Broadcast() 1379 s.mu.Unlock() 1380 1381 for lis := range listeners { 1382 lis.Close() 1383 } 1384 for c := range st { 1385 c.Close() 1386 } 1387 1388 s.mu.Lock() 1389 if s.events != nil { 1390 s.events.Finish() 1391 s.events = nil 1392 } 1393 s.mu.Unlock() 1394} 1395 1396// GracefulStop stops the gRPC server gracefully. It stops the server from 1397// accepting new connections and RPCs and blocks until all the pending RPCs are 1398// finished. 1399func (s *Server) GracefulStop() { 1400 s.quitOnce.Do(func() { 1401 close(s.quit) 1402 }) 1403 1404 defer func() { 1405 s.doneOnce.Do(func() { 1406 close(s.done) 1407 }) 1408 }() 1409 1410 s.channelzRemoveOnce.Do(func() { 1411 if channelz.IsOn() { 1412 channelz.RemoveEntry(s.channelzID) 1413 } 1414 }) 1415 s.mu.Lock() 1416 if s.conns == nil { 1417 s.mu.Unlock() 1418 return 1419 } 1420 1421 for lis := range s.lis { 1422 lis.Close() 1423 } 1424 s.lis = nil 1425 if !s.drain { 1426 for st := range s.conns { 1427 st.Drain() 1428 } 1429 s.drain = true 1430 } 1431 1432 // Wait for serving threads to be ready to exit. Only then can we be sure no 1433 // new conns will be created. 1434 s.mu.Unlock() 1435 s.serveWG.Wait() 1436 s.mu.Lock() 1437 1438 for len(s.conns) != 0 { 1439 s.cv.Wait() 1440 } 1441 s.conns = nil 1442 if s.events != nil { 1443 s.events.Finish() 1444 s.events = nil 1445 } 1446 s.mu.Unlock() 1447} 1448 1449// contentSubtype must be lowercase 1450// cannot return nil 1451func (s *Server) getCodec(contentSubtype string) baseCodec { 1452 if s.opts.codec != nil { 1453 return s.opts.codec 1454 } 1455 if contentSubtype == "" { 1456 return encoding.GetCodec(proto.Name) 1457 } 1458 codec := encoding.GetCodec(contentSubtype) 1459 if codec == nil { 1460 return encoding.GetCodec(proto.Name) 1461 } 1462 return codec 1463} 1464 1465// SetHeader sets the header metadata. 1466// When called multiple times, all the provided metadata will be merged. 1467// All the metadata will be sent out when one of the following happens: 1468// - grpc.SendHeader() is called; 1469// - The first response is sent out; 1470// - An RPC status is sent out (error or success). 1471func SetHeader(ctx context.Context, md metadata.MD) error { 1472 if md.Len() == 0 { 1473 return nil 1474 } 1475 stream := ServerTransportStreamFromContext(ctx) 1476 if stream == nil { 1477 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1478 } 1479 return stream.SetHeader(md) 1480} 1481 1482// SendHeader sends header metadata. It may be called at most once. 1483// The provided md and headers set by SetHeader() will be sent. 1484func SendHeader(ctx context.Context, md metadata.MD) error { 1485 stream := ServerTransportStreamFromContext(ctx) 1486 if stream == nil { 1487 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1488 } 1489 if err := stream.SendHeader(md); err != nil { 1490 return toRPCErr(err) 1491 } 1492 return nil 1493} 1494 1495// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1496// When called more than once, all the provided metadata will be merged. 1497func SetTrailer(ctx context.Context, md metadata.MD) error { 1498 if md.Len() == 0 { 1499 return nil 1500 } 1501 stream := ServerTransportStreamFromContext(ctx) 1502 if stream == nil { 1503 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1504 } 1505 return stream.SetTrailer(md) 1506} 1507 1508// Method returns the method string for the server context. The returned 1509// string is in the format of "/service/method". 1510func Method(ctx context.Context) (string, bool) { 1511 s := ServerTransportStreamFromContext(ctx) 1512 if s == nil { 1513 return "", false 1514 } 1515 return s.Method(), true 1516} 1517 1518type channelzServer struct { 1519 s *Server 1520} 1521 1522func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1523 return c.s.channelzMetric() 1524} 1525