1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal" 44 "google.golang.org/grpc/internal/binarylog" 45 "google.golang.org/grpc/internal/channelz" 46 "google.golang.org/grpc/internal/grpcrand" 47 "google.golang.org/grpc/internal/grpcsync" 48 "google.golang.org/grpc/internal/transport" 49 "google.golang.org/grpc/keepalive" 50 "google.golang.org/grpc/metadata" 51 "google.golang.org/grpc/peer" 52 "google.golang.org/grpc/stats" 53 "google.golang.org/grpc/status" 54 "google.golang.org/grpc/tap" 55) 56 57const ( 58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 59 defaultServerMaxSendMessageSize = math.MaxInt32 60 61 // Server transports are tracked in a map which is keyed on listener 62 // address. For regular gRPC traffic, connections are accepted in Serve() 63 // through a call to Accept(), and we use the actual listener address as key 64 // when we add it to the map. But for connections received through 65 // ServeHTTP(), we do not have a listener and hence use this dummy value. 66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP" 67) 68 69func init() { 70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { 71 return srv.opts.creds 72 } 73 internal.DrainServerTransports = func(srv *Server, addr string) { 74 srv.drainServerTransports(addr) 75 } 76} 77 78var statusOK = status.New(codes.OK, "") 79var logger = grpclog.Component("core") 80 81type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 82 83// MethodDesc represents an RPC service's method specification. 84type MethodDesc struct { 85 MethodName string 86 Handler methodHandler 87} 88 89// ServiceDesc represents an RPC service's specification. 90type ServiceDesc struct { 91 ServiceName string 92 // The pointer to the service interface. Used to check whether the user 93 // provided implementation satisfies the interface requirements. 94 HandlerType interface{} 95 Methods []MethodDesc 96 Streams []StreamDesc 97 Metadata interface{} 98} 99 100// serviceInfo wraps information about a service. It is very similar to 101// ServiceDesc and is constructed from it for internal purposes. 102type serviceInfo struct { 103 // Contains the implementation for the methods in this service. 104 serviceImpl interface{} 105 methods map[string]*MethodDesc 106 streams map[string]*StreamDesc 107 mdata interface{} 108} 109 110type serverWorkerData struct { 111 st transport.ServerTransport 112 wg *sync.WaitGroup 113 stream *transport.Stream 114} 115 116// Server is a gRPC server to serve RPC requests. 117type Server struct { 118 opts serverOptions 119 120 mu sync.Mutex // guards following 121 lis map[net.Listener]bool 122 // conns contains all active server transports. It is a map keyed on a 123 // listener address with the value being the set of active transports 124 // belonging to that listener. 125 conns map[string]map[transport.ServerTransport]bool 126 serve bool 127 drain bool 128 cv *sync.Cond // signaled when connections close for GracefulStop 129 services map[string]*serviceInfo // service name -> service info 130 events trace.EventLog 131 132 quit *grpcsync.Event 133 done *grpcsync.Event 134 channelzRemoveOnce sync.Once 135 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 136 137 channelzID int64 // channelz unique identification number 138 czData *channelzData 139 140 serverWorkerChannels []chan *serverWorkerData 141} 142 143type serverOptions struct { 144 creds credentials.TransportCredentials 145 codec baseCodec 146 cp Compressor 147 dc Decompressor 148 unaryInt UnaryServerInterceptor 149 streamInt StreamServerInterceptor 150 chainUnaryInts []UnaryServerInterceptor 151 chainStreamInts []StreamServerInterceptor 152 inTapHandle tap.ServerInHandle 153 statsHandler stats.Handler 154 maxConcurrentStreams uint32 155 maxReceiveMessageSize int 156 maxSendMessageSize int 157 unknownStreamDesc *StreamDesc 158 keepaliveParams keepalive.ServerParameters 159 keepalivePolicy keepalive.EnforcementPolicy 160 initialWindowSize int32 161 initialConnWindowSize int32 162 writeBufferSize int 163 readBufferSize int 164 connectionTimeout time.Duration 165 maxHeaderListSize *uint32 166 headerTableSize *uint32 167 numServerWorkers uint32 168} 169 170var defaultServerOptions = serverOptions{ 171 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 172 maxSendMessageSize: defaultServerMaxSendMessageSize, 173 connectionTimeout: 120 * time.Second, 174 writeBufferSize: defaultWriteBufSize, 175 readBufferSize: defaultReadBufSize, 176} 177 178// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 179type ServerOption interface { 180 apply(*serverOptions) 181} 182 183// EmptyServerOption does not alter the server configuration. It can be embedded 184// in another structure to build custom server options. 185// 186// Experimental 187// 188// Notice: This type is EXPERIMENTAL and may be changed or removed in a 189// later release. 190type EmptyServerOption struct{} 191 192func (EmptyServerOption) apply(*serverOptions) {} 193 194// funcServerOption wraps a function that modifies serverOptions into an 195// implementation of the ServerOption interface. 196type funcServerOption struct { 197 f func(*serverOptions) 198} 199 200func (fdo *funcServerOption) apply(do *serverOptions) { 201 fdo.f(do) 202} 203 204func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 205 return &funcServerOption{ 206 f: f, 207 } 208} 209 210// WriteBufferSize determines how much data can be batched before doing a write on the wire. 211// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 212// The default value for this buffer is 32KB. 213// Zero will disable the write buffer such that each write will be on underlying connection. 214// Note: A Send call may not directly translate to a write. 215func WriteBufferSize(s int) ServerOption { 216 return newFuncServerOption(func(o *serverOptions) { 217 o.writeBufferSize = s 218 }) 219} 220 221// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 222// for one read syscall. 223// The default value for this buffer is 32KB. 224// Zero will disable read buffer for a connection so data framer can access the underlying 225// conn directly. 226func ReadBufferSize(s int) ServerOption { 227 return newFuncServerOption(func(o *serverOptions) { 228 o.readBufferSize = s 229 }) 230} 231 232// InitialWindowSize returns a ServerOption that sets window size for stream. 233// The lower bound for window size is 64K and any value smaller than that will be ignored. 234func InitialWindowSize(s int32) ServerOption { 235 return newFuncServerOption(func(o *serverOptions) { 236 o.initialWindowSize = s 237 }) 238} 239 240// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 241// The lower bound for window size is 64K and any value smaller than that will be ignored. 242func InitialConnWindowSize(s int32) ServerOption { 243 return newFuncServerOption(func(o *serverOptions) { 244 o.initialConnWindowSize = s 245 }) 246} 247 248// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 249func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 250 if kp.Time > 0 && kp.Time < time.Second { 251 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") 252 kp.Time = time.Second 253 } 254 255 return newFuncServerOption(func(o *serverOptions) { 256 o.keepaliveParams = kp 257 }) 258} 259 260// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 261func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 262 return newFuncServerOption(func(o *serverOptions) { 263 o.keepalivePolicy = kep 264 }) 265} 266 267// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 268// 269// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 270// 271// Deprecated: register codecs using encoding.RegisterCodec. The server will 272// automatically use registered codecs based on the incoming requests' headers. 273// See also 274// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 275// Will be supported throughout 1.x. 276func CustomCodec(codec Codec) ServerOption { 277 return newFuncServerOption(func(o *serverOptions) { 278 o.codec = codec 279 }) 280} 281 282// ForceServerCodec returns a ServerOption that sets a codec for message 283// marshaling and unmarshaling. 284// 285// This will override any lookups by content-subtype for Codecs registered 286// with RegisterCodec. 287// 288// See Content-Type on 289// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 290// more details. Also see the documentation on RegisterCodec and 291// CallContentSubtype for more details on the interaction between encoding.Codec 292// and content-subtype. 293// 294// This function is provided for advanced users; prefer to register codecs 295// using encoding.RegisterCodec. 296// The server will automatically use registered codecs based on the incoming 297// requests' headers. See also 298// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 299// Will be supported throughout 1.x. 300// 301// Experimental 302// 303// Notice: This API is EXPERIMENTAL and may be changed or removed in a 304// later release. 305func ForceServerCodec(codec encoding.Codec) ServerOption { 306 return newFuncServerOption(func(o *serverOptions) { 307 o.codec = codec 308 }) 309} 310 311// RPCCompressor returns a ServerOption that sets a compressor for outbound 312// messages. For backward compatibility, all outbound messages will be sent 313// using this compressor, regardless of incoming message compression. By 314// default, server messages will be sent using the same compressor with which 315// request messages were sent. 316// 317// Deprecated: use encoding.RegisterCompressor instead. Will be supported 318// throughout 1.x. 319func RPCCompressor(cp Compressor) ServerOption { 320 return newFuncServerOption(func(o *serverOptions) { 321 o.cp = cp 322 }) 323} 324 325// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 326// messages. It has higher priority than decompressors registered via 327// encoding.RegisterCompressor. 328// 329// Deprecated: use encoding.RegisterCompressor instead. Will be supported 330// throughout 1.x. 331func RPCDecompressor(dc Decompressor) ServerOption { 332 return newFuncServerOption(func(o *serverOptions) { 333 o.dc = dc 334 }) 335} 336 337// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 338// If this is not set, gRPC uses the default limit. 339// 340// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. 341func MaxMsgSize(m int) ServerOption { 342 return MaxRecvMsgSize(m) 343} 344 345// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 346// If this is not set, gRPC uses the default 4MB. 347func MaxRecvMsgSize(m int) ServerOption { 348 return newFuncServerOption(func(o *serverOptions) { 349 o.maxReceiveMessageSize = m 350 }) 351} 352 353// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 354// If this is not set, gRPC uses the default `math.MaxInt32`. 355func MaxSendMsgSize(m int) ServerOption { 356 return newFuncServerOption(func(o *serverOptions) { 357 o.maxSendMessageSize = m 358 }) 359} 360 361// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 362// of concurrent streams to each ServerTransport. 363func MaxConcurrentStreams(n uint32) ServerOption { 364 return newFuncServerOption(func(o *serverOptions) { 365 o.maxConcurrentStreams = n 366 }) 367} 368 369// Creds returns a ServerOption that sets credentials for server connections. 370func Creds(c credentials.TransportCredentials) ServerOption { 371 return newFuncServerOption(func(o *serverOptions) { 372 o.creds = c 373 }) 374} 375 376// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 377// server. Only one unary interceptor can be installed. The construction of multiple 378// interceptors (e.g., chaining) can be implemented at the caller. 379func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 380 return newFuncServerOption(func(o *serverOptions) { 381 if o.unaryInt != nil { 382 panic("The unary server interceptor was already set and may not be reset.") 383 } 384 o.unaryInt = i 385 }) 386} 387 388// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 389// for unary RPCs. The first interceptor will be the outer most, 390// while the last interceptor will be the inner most wrapper around the real call. 391// All unary interceptors added by this method will be chained. 392func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 393 return newFuncServerOption(func(o *serverOptions) { 394 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 395 }) 396} 397 398// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 399// server. Only one stream interceptor can be installed. 400func StreamInterceptor(i StreamServerInterceptor) ServerOption { 401 return newFuncServerOption(func(o *serverOptions) { 402 if o.streamInt != nil { 403 panic("The stream server interceptor was already set and may not be reset.") 404 } 405 o.streamInt = i 406 }) 407} 408 409// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 410// for streaming RPCs. The first interceptor will be the outer most, 411// while the last interceptor will be the inner most wrapper around the real call. 412// All stream interceptors added by this method will be chained. 413func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 414 return newFuncServerOption(func(o *serverOptions) { 415 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 416 }) 417} 418 419// InTapHandle returns a ServerOption that sets the tap handle for all the server 420// transport to be created. Only one can be installed. 421// 422// Experimental 423// 424// Notice: This API is EXPERIMENTAL and may be changed or removed in a 425// later release. 426func InTapHandle(h tap.ServerInHandle) ServerOption { 427 return newFuncServerOption(func(o *serverOptions) { 428 if o.inTapHandle != nil { 429 panic("The tap handle was already set and may not be reset.") 430 } 431 o.inTapHandle = h 432 }) 433} 434 435// StatsHandler returns a ServerOption that sets the stats handler for the server. 436func StatsHandler(h stats.Handler) ServerOption { 437 return newFuncServerOption(func(o *serverOptions) { 438 o.statsHandler = h 439 }) 440} 441 442// UnknownServiceHandler returns a ServerOption that allows for adding a custom 443// unknown service handler. The provided method is a bidi-streaming RPC service 444// handler that will be invoked instead of returning the "unimplemented" gRPC 445// error whenever a request is received for an unregistered service or method. 446// The handling function and stream interceptor (if set) have full access to 447// the ServerStream, including its Context. 448func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 449 return newFuncServerOption(func(o *serverOptions) { 450 o.unknownStreamDesc = &StreamDesc{ 451 StreamName: "unknown_service_handler", 452 Handler: streamHandler, 453 // We need to assume that the users of the streamHandler will want to use both. 454 ClientStreams: true, 455 ServerStreams: true, 456 } 457 }) 458} 459 460// ConnectionTimeout returns a ServerOption that sets the timeout for 461// connection establishment (up to and including HTTP/2 handshaking) for all 462// new connections. If this is not set, the default is 120 seconds. A zero or 463// negative value will result in an immediate timeout. 464// 465// Experimental 466// 467// Notice: This API is EXPERIMENTAL and may be changed or removed in a 468// later release. 469func ConnectionTimeout(d time.Duration) ServerOption { 470 return newFuncServerOption(func(o *serverOptions) { 471 o.connectionTimeout = d 472 }) 473} 474 475// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 476// of header list that the server is prepared to accept. 477func MaxHeaderListSize(s uint32) ServerOption { 478 return newFuncServerOption(func(o *serverOptions) { 479 o.maxHeaderListSize = &s 480 }) 481} 482 483// HeaderTableSize returns a ServerOption that sets the size of dynamic 484// header table for stream. 485// 486// Experimental 487// 488// Notice: This API is EXPERIMENTAL and may be changed or removed in a 489// later release. 490func HeaderTableSize(s uint32) ServerOption { 491 return newFuncServerOption(func(o *serverOptions) { 492 o.headerTableSize = &s 493 }) 494} 495 496// NumStreamWorkers returns a ServerOption that sets the number of worker 497// goroutines that should be used to process incoming streams. Setting this to 498// zero (default) will disable workers and spawn a new goroutine for each 499// stream. 500// 501// Experimental 502// 503// Notice: This API is EXPERIMENTAL and may be changed or removed in a 504// later release. 505func NumStreamWorkers(numServerWorkers uint32) ServerOption { 506 // TODO: If/when this API gets stabilized (i.e. stream workers become the 507 // only way streams are processed), change the behavior of the zero value to 508 // a sane default. Preliminary experiments suggest that a value equal to the 509 // number of CPUs available is most performant; requires thorough testing. 510 return newFuncServerOption(func(o *serverOptions) { 511 o.numServerWorkers = numServerWorkers 512 }) 513} 514 515// serverWorkerResetThreshold defines how often the stack must be reset. Every 516// N requests, by spawning a new goroutine in its place, a worker can reset its 517// stack so that large stacks don't live in memory forever. 2^16 should allow 518// each goroutine stack to live for at least a few seconds in a typical 519// workload (assuming a QPS of a few thousand requests/sec). 520const serverWorkerResetThreshold = 1 << 16 521 522// serverWorkers blocks on a *transport.Stream channel forever and waits for 523// data to be fed by serveStreams. This allows different requests to be 524// processed by the same goroutine, removing the need for expensive stack 525// re-allocations (see the runtime.morestack problem [1]). 526// 527// [1] https://github.com/golang/go/issues/18138 528func (s *Server) serverWorker(ch chan *serverWorkerData) { 529 // To make sure all server workers don't reset at the same time, choose a 530 // random number of iterations before resetting. 531 threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) 532 for completed := 0; completed < threshold; completed++ { 533 data, ok := <-ch 534 if !ok { 535 return 536 } 537 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) 538 data.wg.Done() 539 } 540 go s.serverWorker(ch) 541} 542 543// initServerWorkers creates worker goroutines and channels to process incoming 544// connections to reduce the time spent overall on runtime.morestack. 545func (s *Server) initServerWorkers() { 546 s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) 547 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 548 s.serverWorkerChannels[i] = make(chan *serverWorkerData) 549 go s.serverWorker(s.serverWorkerChannels[i]) 550 } 551} 552 553func (s *Server) stopServerWorkers() { 554 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 555 close(s.serverWorkerChannels[i]) 556 } 557} 558 559// NewServer creates a gRPC server which has no service registered and has not 560// started to accept requests yet. 561func NewServer(opt ...ServerOption) *Server { 562 opts := defaultServerOptions 563 for _, o := range opt { 564 o.apply(&opts) 565 } 566 s := &Server{ 567 lis: make(map[net.Listener]bool), 568 opts: opts, 569 conns: make(map[string]map[transport.ServerTransport]bool), 570 services: make(map[string]*serviceInfo), 571 quit: grpcsync.NewEvent(), 572 done: grpcsync.NewEvent(), 573 czData: new(channelzData), 574 } 575 chainUnaryServerInterceptors(s) 576 chainStreamServerInterceptors(s) 577 s.cv = sync.NewCond(&s.mu) 578 if EnableTracing { 579 _, file, line, _ := runtime.Caller(1) 580 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 581 } 582 583 if s.opts.numServerWorkers > 0 { 584 s.initServerWorkers() 585 } 586 587 if channelz.IsOn() { 588 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 589 } 590 return s 591} 592 593// printf records an event in s's event log, unless s has been stopped. 594// REQUIRES s.mu is held. 595func (s *Server) printf(format string, a ...interface{}) { 596 if s.events != nil { 597 s.events.Printf(format, a...) 598 } 599} 600 601// errorf records an error in s's event log, unless s has been stopped. 602// REQUIRES s.mu is held. 603func (s *Server) errorf(format string, a ...interface{}) { 604 if s.events != nil { 605 s.events.Errorf(format, a...) 606 } 607} 608 609// ServiceRegistrar wraps a single method that supports service registration. It 610// enables users to pass concrete types other than grpc.Server to the service 611// registration methods exported by the IDL generated code. 612type ServiceRegistrar interface { 613 // RegisterService registers a service and its implementation to the 614 // concrete type implementing this interface. It may not be called 615 // once the server has started serving. 616 // desc describes the service and its methods and handlers. impl is the 617 // service implementation which is passed to the method handlers. 618 RegisterService(desc *ServiceDesc, impl interface{}) 619} 620 621// RegisterService registers a service and its implementation to the gRPC 622// server. It is called from the IDL generated code. This must be called before 623// invoking Serve. If ss is non-nil (for legacy code), its type is checked to 624// ensure it implements sd.HandlerType. 625func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 626 if ss != nil { 627 ht := reflect.TypeOf(sd.HandlerType).Elem() 628 st := reflect.TypeOf(ss) 629 if !st.Implements(ht) { 630 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 631 } 632 } 633 s.register(sd, ss) 634} 635 636func (s *Server) register(sd *ServiceDesc, ss interface{}) { 637 s.mu.Lock() 638 defer s.mu.Unlock() 639 s.printf("RegisterService(%q)", sd.ServiceName) 640 if s.serve { 641 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 642 } 643 if _, ok := s.services[sd.ServiceName]; ok { 644 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 645 } 646 info := &serviceInfo{ 647 serviceImpl: ss, 648 methods: make(map[string]*MethodDesc), 649 streams: make(map[string]*StreamDesc), 650 mdata: sd.Metadata, 651 } 652 for i := range sd.Methods { 653 d := &sd.Methods[i] 654 info.methods[d.MethodName] = d 655 } 656 for i := range sd.Streams { 657 d := &sd.Streams[i] 658 info.streams[d.StreamName] = d 659 } 660 s.services[sd.ServiceName] = info 661} 662 663// MethodInfo contains the information of an RPC including its method name and type. 664type MethodInfo struct { 665 // Name is the method name only, without the service name or package name. 666 Name string 667 // IsClientStream indicates whether the RPC is a client streaming RPC. 668 IsClientStream bool 669 // IsServerStream indicates whether the RPC is a server streaming RPC. 670 IsServerStream bool 671} 672 673// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 674type ServiceInfo struct { 675 Methods []MethodInfo 676 // Metadata is the metadata specified in ServiceDesc when registering service. 677 Metadata interface{} 678} 679 680// GetServiceInfo returns a map from service names to ServiceInfo. 681// Service names include the package names, in the form of <package>.<service>. 682func (s *Server) GetServiceInfo() map[string]ServiceInfo { 683 ret := make(map[string]ServiceInfo) 684 for n, srv := range s.services { 685 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams)) 686 for m := range srv.methods { 687 methods = append(methods, MethodInfo{ 688 Name: m, 689 IsClientStream: false, 690 IsServerStream: false, 691 }) 692 } 693 for m, d := range srv.streams { 694 methods = append(methods, MethodInfo{ 695 Name: m, 696 IsClientStream: d.ClientStreams, 697 IsServerStream: d.ServerStreams, 698 }) 699 } 700 701 ret[n] = ServiceInfo{ 702 Methods: methods, 703 Metadata: srv.mdata, 704 } 705 } 706 return ret 707} 708 709// ErrServerStopped indicates that the operation is now illegal because of 710// the server being stopped. 711var ErrServerStopped = errors.New("grpc: the server has been stopped") 712 713type listenSocket struct { 714 net.Listener 715 channelzID int64 716} 717 718func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 719 return &channelz.SocketInternalMetric{ 720 SocketOptions: channelz.GetSocketOption(l.Listener), 721 LocalAddr: l.Listener.Addr(), 722 } 723} 724 725func (l *listenSocket) Close() error { 726 err := l.Listener.Close() 727 if channelz.IsOn() { 728 channelz.RemoveEntry(l.channelzID) 729 } 730 return err 731} 732 733// Serve accepts incoming connections on the listener lis, creating a new 734// ServerTransport and service goroutine for each. The service goroutines 735// read gRPC requests and then call the registered handlers to reply to them. 736// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 737// this method returns. 738// Serve will return a non-nil error unless Stop or GracefulStop is called. 739func (s *Server) Serve(lis net.Listener) error { 740 s.mu.Lock() 741 s.printf("serving") 742 s.serve = true 743 if s.lis == nil { 744 // Serve called after Stop or GracefulStop. 745 s.mu.Unlock() 746 lis.Close() 747 return ErrServerStopped 748 } 749 750 s.serveWG.Add(1) 751 defer func() { 752 s.serveWG.Done() 753 if s.quit.HasFired() { 754 // Stop or GracefulStop called; block until done and return nil. 755 <-s.done.Done() 756 } 757 }() 758 759 ls := &listenSocket{Listener: lis} 760 s.lis[ls] = true 761 762 if channelz.IsOn() { 763 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 764 } 765 s.mu.Unlock() 766 767 defer func() { 768 s.mu.Lock() 769 if s.lis != nil && s.lis[ls] { 770 ls.Close() 771 delete(s.lis, ls) 772 } 773 s.mu.Unlock() 774 }() 775 776 var tempDelay time.Duration // how long to sleep on accept failure 777 778 for { 779 rawConn, err := lis.Accept() 780 if err != nil { 781 if ne, ok := err.(interface { 782 Temporary() bool 783 }); ok && ne.Temporary() { 784 if tempDelay == 0 { 785 tempDelay = 5 * time.Millisecond 786 } else { 787 tempDelay *= 2 788 } 789 if max := 1 * time.Second; tempDelay > max { 790 tempDelay = max 791 } 792 s.mu.Lock() 793 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 794 s.mu.Unlock() 795 timer := time.NewTimer(tempDelay) 796 select { 797 case <-timer.C: 798 case <-s.quit.Done(): 799 timer.Stop() 800 return nil 801 } 802 continue 803 } 804 s.mu.Lock() 805 s.printf("done serving; Accept = %v", err) 806 s.mu.Unlock() 807 808 if s.quit.HasFired() { 809 return nil 810 } 811 return err 812 } 813 tempDelay = 0 814 // Start a new goroutine to deal with rawConn so we don't stall this Accept 815 // loop goroutine. 816 // 817 // Make sure we account for the goroutine so GracefulStop doesn't nil out 818 // s.conns before this conn can be added. 819 s.serveWG.Add(1) 820 go func() { 821 s.handleRawConn(lis.Addr().String(), rawConn) 822 s.serveWG.Done() 823 }() 824 } 825} 826 827// handleRawConn forks a goroutine to handle a just-accepted connection that 828// has not had any I/O performed on it yet. 829func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { 830 if s.quit.HasFired() { 831 rawConn.Close() 832 return 833 } 834 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 835 836 // Finish handshaking (HTTP2) 837 st := s.newHTTP2Transport(rawConn) 838 rawConn.SetDeadline(time.Time{}) 839 if st == nil { 840 return 841 } 842 843 if !s.addConn(lisAddr, st) { 844 return 845 } 846 go func() { 847 s.serveStreams(st) 848 s.removeConn(lisAddr, st) 849 }() 850} 851 852func (s *Server) drainServerTransports(addr string) { 853 s.mu.Lock() 854 conns := s.conns[addr] 855 for st := range conns { 856 st.Drain() 857 } 858 s.mu.Unlock() 859} 860 861// newHTTP2Transport sets up a http/2 transport (using the 862// gRPC http2 server transport in transport/http2_server.go). 863func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { 864 config := &transport.ServerConfig{ 865 MaxStreams: s.opts.maxConcurrentStreams, 866 ConnectionTimeout: s.opts.connectionTimeout, 867 Credentials: s.opts.creds, 868 InTapHandle: s.opts.inTapHandle, 869 StatsHandler: s.opts.statsHandler, 870 KeepaliveParams: s.opts.keepaliveParams, 871 KeepalivePolicy: s.opts.keepalivePolicy, 872 InitialWindowSize: s.opts.initialWindowSize, 873 InitialConnWindowSize: s.opts.initialConnWindowSize, 874 WriteBufferSize: s.opts.writeBufferSize, 875 ReadBufferSize: s.opts.readBufferSize, 876 ChannelzParentID: s.channelzID, 877 MaxHeaderListSize: s.opts.maxHeaderListSize, 878 HeaderTableSize: s.opts.headerTableSize, 879 } 880 st, err := transport.NewServerTransport(c, config) 881 if err != nil { 882 s.mu.Lock() 883 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 884 s.mu.Unlock() 885 // ErrConnDispatched means that the connection was dispatched away from 886 // gRPC; those connections should be left open. 887 if err != credentials.ErrConnDispatched { 888 c.Close() 889 } 890 // Don't log on ErrConnDispatched and io.EOF to prevent log spam. 891 if err != credentials.ErrConnDispatched { 892 if err != io.EOF { 893 channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 894 } 895 } 896 return nil 897 } 898 899 return st 900} 901 902func (s *Server) serveStreams(st transport.ServerTransport) { 903 defer st.Close() 904 var wg sync.WaitGroup 905 906 var roundRobinCounter uint32 907 st.HandleStreams(func(stream *transport.Stream) { 908 wg.Add(1) 909 if s.opts.numServerWorkers > 0 { 910 data := &serverWorkerData{st: st, wg: &wg, stream: stream} 911 select { 912 case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: 913 default: 914 // If all stream workers are busy, fallback to the default code path. 915 go func() { 916 s.handleStream(st, stream, s.traceInfo(st, stream)) 917 wg.Done() 918 }() 919 } 920 } else { 921 go func() { 922 defer wg.Done() 923 s.handleStream(st, stream, s.traceInfo(st, stream)) 924 }() 925 } 926 }, func(ctx context.Context, method string) context.Context { 927 if !EnableTracing { 928 return ctx 929 } 930 tr := trace.New("grpc.Recv."+methodFamily(method), method) 931 return trace.NewContext(ctx, tr) 932 }) 933 wg.Wait() 934} 935 936var _ http.Handler = (*Server)(nil) 937 938// ServeHTTP implements the Go standard library's http.Handler 939// interface by responding to the gRPC request r, by looking up 940// the requested gRPC method in the gRPC server s. 941// 942// The provided HTTP request must have arrived on an HTTP/2 943// connection. When using the Go standard library's server, 944// practically this means that the Request must also have arrived 945// over TLS. 946// 947// To share one port (such as 443 for https) between gRPC and an 948// existing http.Handler, use a root http.Handler such as: 949// 950// if r.ProtoMajor == 2 && strings.HasPrefix( 951// r.Header.Get("Content-Type"), "application/grpc") { 952// grpcServer.ServeHTTP(w, r) 953// } else { 954// yourMux.ServeHTTP(w, r) 955// } 956// 957// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 958// separate from grpc-go's HTTP/2 server. Performance and features may vary 959// between the two paths. ServeHTTP does not support some gRPC features 960// available through grpc-go's HTTP/2 server. 961// 962// Experimental 963// 964// Notice: This API is EXPERIMENTAL and may be changed or removed in a 965// later release. 966func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 967 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 968 if err != nil { 969 http.Error(w, err.Error(), http.StatusInternalServerError) 970 return 971 } 972 if !s.addConn(listenerAddressForServeHTTP, st) { 973 return 974 } 975 defer s.removeConn(listenerAddressForServeHTTP, st) 976 s.serveStreams(st) 977} 978 979// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 980// If tracing is not enabled, it returns nil. 981func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 982 if !EnableTracing { 983 return nil 984 } 985 tr, ok := trace.FromContext(stream.Context()) 986 if !ok { 987 return nil 988 } 989 990 trInfo = &traceInfo{ 991 tr: tr, 992 firstLine: firstLine{ 993 client: false, 994 remoteAddr: st.RemoteAddr(), 995 }, 996 } 997 if dl, ok := stream.Context().Deadline(); ok { 998 trInfo.firstLine.deadline = time.Until(dl) 999 } 1000 return trInfo 1001} 1002 1003func (s *Server) addConn(addr string, st transport.ServerTransport) bool { 1004 s.mu.Lock() 1005 defer s.mu.Unlock() 1006 if s.conns == nil { 1007 st.Close() 1008 return false 1009 } 1010 if s.drain { 1011 // Transport added after we drained our existing conns: drain it 1012 // immediately. 1013 st.Drain() 1014 } 1015 1016 if s.conns[addr] == nil { 1017 // Create a map entry if this is the first connection on this listener. 1018 s.conns[addr] = make(map[transport.ServerTransport]bool) 1019 } 1020 s.conns[addr][st] = true 1021 return true 1022} 1023 1024func (s *Server) removeConn(addr string, st transport.ServerTransport) { 1025 s.mu.Lock() 1026 defer s.mu.Unlock() 1027 1028 conns := s.conns[addr] 1029 if conns != nil { 1030 delete(conns, st) 1031 if len(conns) == 0 { 1032 // If the last connection for this address is being removed, also 1033 // remove the map entry corresponding to the address. This is used 1034 // in GracefulStop() when waiting for all connections to be closed. 1035 delete(s.conns, addr) 1036 } 1037 s.cv.Broadcast() 1038 } 1039} 1040 1041func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 1042 return &channelz.ServerInternalMetric{ 1043 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 1044 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 1045 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 1046 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 1047 } 1048} 1049 1050func (s *Server) incrCallsStarted() { 1051 atomic.AddInt64(&s.czData.callsStarted, 1) 1052 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 1053} 1054 1055func (s *Server) incrCallsSucceeded() { 1056 atomic.AddInt64(&s.czData.callsSucceeded, 1) 1057} 1058 1059func (s *Server) incrCallsFailed() { 1060 atomic.AddInt64(&s.czData.callsFailed, 1) 1061} 1062 1063func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 1064 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 1065 if err != nil { 1066 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) 1067 return err 1068 } 1069 compData, err := compress(data, cp, comp) 1070 if err != nil { 1071 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) 1072 return err 1073 } 1074 hdr, payload := msgHeader(data, compData) 1075 // TODO(dfawley): should we be checking len(data) instead? 1076 if len(payload) > s.opts.maxSendMessageSize { 1077 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 1078 } 1079 err = t.Write(stream, hdr, payload, opts) 1080 if err == nil && s.opts.statsHandler != nil { 1081 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 1082 } 1083 return err 1084} 1085 1086// chainUnaryServerInterceptors chains all unary server interceptors into one. 1087func chainUnaryServerInterceptors(s *Server) { 1088 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 1089 // be executed before any other chained interceptors. 1090 interceptors := s.opts.chainUnaryInts 1091 if s.opts.unaryInt != nil { 1092 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 1093 } 1094 1095 var chainedInt UnaryServerInterceptor 1096 if len(interceptors) == 0 { 1097 chainedInt = nil 1098 } else if len(interceptors) == 1 { 1099 chainedInt = interceptors[0] 1100 } else { 1101 chainedInt = chainUnaryInterceptors(interceptors) 1102 } 1103 1104 s.opts.unaryInt = chainedInt 1105} 1106 1107func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { 1108 return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 1109 var i int 1110 var next UnaryHandler 1111 next = func(ctx context.Context, req interface{}) (interface{}, error) { 1112 if i == len(interceptors)-1 { 1113 return interceptors[i](ctx, req, info, handler) 1114 } 1115 i++ 1116 return interceptors[i-1](ctx, req, info, next) 1117 } 1118 return next(ctx, req) 1119 } 1120} 1121 1122func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { 1123 sh := s.opts.statsHandler 1124 if sh != nil || trInfo != nil || channelz.IsOn() { 1125 if channelz.IsOn() { 1126 s.incrCallsStarted() 1127 } 1128 var statsBegin *stats.Begin 1129 if sh != nil { 1130 beginTime := time.Now() 1131 statsBegin = &stats.Begin{ 1132 BeginTime: beginTime, 1133 IsClientStream: false, 1134 IsServerStream: false, 1135 } 1136 sh.HandleRPC(stream.Context(), statsBegin) 1137 } 1138 if trInfo != nil { 1139 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1140 } 1141 // The deferred error handling for tracing, stats handler and channelz are 1142 // combined into one function to reduce stack usage -- a defer takes ~56-64 1143 // bytes on the stack, so overflowing the stack will require a stack 1144 // re-allocation, which is expensive. 1145 // 1146 // To maintain behavior similar to separate deferred statements, statements 1147 // should be executed in the reverse order. That is, tracing first, stats 1148 // handler second, and channelz last. Note that panics *within* defers will 1149 // lead to different behavior, but that's an acceptable compromise; that 1150 // would be undefined behavior territory anyway. 1151 defer func() { 1152 if trInfo != nil { 1153 if err != nil && err != io.EOF { 1154 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1155 trInfo.tr.SetError() 1156 } 1157 trInfo.tr.Finish() 1158 } 1159 1160 if sh != nil { 1161 end := &stats.End{ 1162 BeginTime: statsBegin.BeginTime, 1163 EndTime: time.Now(), 1164 } 1165 if err != nil && err != io.EOF { 1166 end.Error = toRPCErr(err) 1167 } 1168 sh.HandleRPC(stream.Context(), end) 1169 } 1170 1171 if channelz.IsOn() { 1172 if err != nil && err != io.EOF { 1173 s.incrCallsFailed() 1174 } else { 1175 s.incrCallsSucceeded() 1176 } 1177 } 1178 }() 1179 } 1180 1181 binlog := binarylog.GetMethodLogger(stream.Method()) 1182 if binlog != nil { 1183 ctx := stream.Context() 1184 md, _ := metadata.FromIncomingContext(ctx) 1185 logEntry := &binarylog.ClientHeader{ 1186 Header: md, 1187 MethodName: stream.Method(), 1188 PeerAddr: nil, 1189 } 1190 if deadline, ok := ctx.Deadline(); ok { 1191 logEntry.Timeout = time.Until(deadline) 1192 if logEntry.Timeout < 0 { 1193 logEntry.Timeout = 0 1194 } 1195 } 1196 if a := md[":authority"]; len(a) > 0 { 1197 logEntry.Authority = a[0] 1198 } 1199 if peer, ok := peer.FromContext(ctx); ok { 1200 logEntry.PeerAddr = peer.Addr 1201 } 1202 binlog.Log(logEntry) 1203 } 1204 1205 // comp and cp are used for compression. decomp and dc are used for 1206 // decompression. If comp and decomp are both set, they are the same; 1207 // however they are kept separate to ensure that at most one of the 1208 // compressor/decompressor variable pairs are set for use later. 1209 var comp, decomp encoding.Compressor 1210 var cp Compressor 1211 var dc Decompressor 1212 1213 // If dc is set and matches the stream's compression, use it. Otherwise, try 1214 // to find a matching registered compressor for decomp. 1215 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1216 dc = s.opts.dc 1217 } else if rc != "" && rc != encoding.Identity { 1218 decomp = encoding.GetCompressor(rc) 1219 if decomp == nil { 1220 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1221 t.WriteStatus(stream, st) 1222 return st.Err() 1223 } 1224 } 1225 1226 // If cp is set, use it. Otherwise, attempt to compress the response using 1227 // the incoming message compression method. 1228 // 1229 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1230 if s.opts.cp != nil { 1231 cp = s.opts.cp 1232 stream.SetSendCompress(cp.Type()) 1233 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1234 // Legacy compressor not specified; attempt to respond with same encoding. 1235 comp = encoding.GetCompressor(rc) 1236 if comp != nil { 1237 stream.SetSendCompress(rc) 1238 } 1239 } 1240 1241 var payInfo *payloadInfo 1242 if sh != nil || binlog != nil { 1243 payInfo = &payloadInfo{} 1244 } 1245 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1246 if err != nil { 1247 if e := t.WriteStatus(stream, status.Convert(err)); e != nil { 1248 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) 1249 } 1250 return err 1251 } 1252 if channelz.IsOn() { 1253 t.IncrMsgRecv() 1254 } 1255 df := func(v interface{}) error { 1256 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1257 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1258 } 1259 if sh != nil { 1260 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1261 RecvTime: time.Now(), 1262 Payload: v, 1263 WireLength: payInfo.wireLength + headerLen, 1264 Data: d, 1265 Length: len(d), 1266 }) 1267 } 1268 if binlog != nil { 1269 binlog.Log(&binarylog.ClientMessage{ 1270 Message: d, 1271 }) 1272 } 1273 if trInfo != nil { 1274 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1275 } 1276 return nil 1277 } 1278 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1279 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) 1280 if appErr != nil { 1281 appStatus, ok := status.FromError(appErr) 1282 if !ok { 1283 // Convert appErr if it is not a grpc status error. 1284 appErr = status.Error(codes.Unknown, appErr.Error()) 1285 appStatus, _ = status.FromError(appErr) 1286 } 1287 if trInfo != nil { 1288 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1289 trInfo.tr.SetError() 1290 } 1291 if e := t.WriteStatus(stream, appStatus); e != nil { 1292 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1293 } 1294 if binlog != nil { 1295 if h, _ := stream.Header(); h.Len() > 0 { 1296 // Only log serverHeader if there was header. Otherwise it can 1297 // be trailer only. 1298 binlog.Log(&binarylog.ServerHeader{ 1299 Header: h, 1300 }) 1301 } 1302 binlog.Log(&binarylog.ServerTrailer{ 1303 Trailer: stream.Trailer(), 1304 Err: appErr, 1305 }) 1306 } 1307 return appErr 1308 } 1309 if trInfo != nil { 1310 trInfo.tr.LazyLog(stringer("OK"), false) 1311 } 1312 opts := &transport.Options{Last: true} 1313 1314 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1315 if err == io.EOF { 1316 // The entire stream is done (for unary RPC only). 1317 return err 1318 } 1319 if sts, ok := status.FromError(err); ok { 1320 if e := t.WriteStatus(stream, sts); e != nil { 1321 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1322 } 1323 } else { 1324 switch st := err.(type) { 1325 case transport.ConnectionError: 1326 // Nothing to do here. 1327 default: 1328 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1329 } 1330 } 1331 if binlog != nil { 1332 h, _ := stream.Header() 1333 binlog.Log(&binarylog.ServerHeader{ 1334 Header: h, 1335 }) 1336 binlog.Log(&binarylog.ServerTrailer{ 1337 Trailer: stream.Trailer(), 1338 Err: appErr, 1339 }) 1340 } 1341 return err 1342 } 1343 if binlog != nil { 1344 h, _ := stream.Header() 1345 binlog.Log(&binarylog.ServerHeader{ 1346 Header: h, 1347 }) 1348 binlog.Log(&binarylog.ServerMessage{ 1349 Message: reply, 1350 }) 1351 } 1352 if channelz.IsOn() { 1353 t.IncrMsgSent() 1354 } 1355 if trInfo != nil { 1356 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1357 } 1358 // TODO: Should we be logging if writing status failed here, like above? 1359 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1360 // error or allow the stats handler to see it? 1361 err = t.WriteStatus(stream, statusOK) 1362 if binlog != nil { 1363 binlog.Log(&binarylog.ServerTrailer{ 1364 Trailer: stream.Trailer(), 1365 Err: appErr, 1366 }) 1367 } 1368 return err 1369} 1370 1371// chainStreamServerInterceptors chains all stream server interceptors into one. 1372func chainStreamServerInterceptors(s *Server) { 1373 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1374 // be executed before any other chained interceptors. 1375 interceptors := s.opts.chainStreamInts 1376 if s.opts.streamInt != nil { 1377 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1378 } 1379 1380 var chainedInt StreamServerInterceptor 1381 if len(interceptors) == 0 { 1382 chainedInt = nil 1383 } else if len(interceptors) == 1 { 1384 chainedInt = interceptors[0] 1385 } else { 1386 chainedInt = chainStreamInterceptors(interceptors) 1387 } 1388 1389 s.opts.streamInt = chainedInt 1390} 1391 1392func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { 1393 return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1394 var i int 1395 var next StreamHandler 1396 next = func(srv interface{}, ss ServerStream) error { 1397 if i == len(interceptors)-1 { 1398 return interceptors[i](srv, ss, info, handler) 1399 } 1400 i++ 1401 return interceptors[i-1](srv, ss, info, next) 1402 } 1403 return next(srv, ss) 1404 } 1405} 1406 1407func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { 1408 if channelz.IsOn() { 1409 s.incrCallsStarted() 1410 } 1411 sh := s.opts.statsHandler 1412 var statsBegin *stats.Begin 1413 if sh != nil { 1414 beginTime := time.Now() 1415 statsBegin = &stats.Begin{ 1416 BeginTime: beginTime, 1417 IsClientStream: sd.ClientStreams, 1418 IsServerStream: sd.ServerStreams, 1419 } 1420 sh.HandleRPC(stream.Context(), statsBegin) 1421 } 1422 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1423 ss := &serverStream{ 1424 ctx: ctx, 1425 t: t, 1426 s: stream, 1427 p: &parser{r: stream}, 1428 codec: s.getCodec(stream.ContentSubtype()), 1429 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1430 maxSendMessageSize: s.opts.maxSendMessageSize, 1431 trInfo: trInfo, 1432 statsHandler: sh, 1433 } 1434 1435 if sh != nil || trInfo != nil || channelz.IsOn() { 1436 // See comment in processUnaryRPC on defers. 1437 defer func() { 1438 if trInfo != nil { 1439 ss.mu.Lock() 1440 if err != nil && err != io.EOF { 1441 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1442 ss.trInfo.tr.SetError() 1443 } 1444 ss.trInfo.tr.Finish() 1445 ss.trInfo.tr = nil 1446 ss.mu.Unlock() 1447 } 1448 1449 if sh != nil { 1450 end := &stats.End{ 1451 BeginTime: statsBegin.BeginTime, 1452 EndTime: time.Now(), 1453 } 1454 if err != nil && err != io.EOF { 1455 end.Error = toRPCErr(err) 1456 } 1457 sh.HandleRPC(stream.Context(), end) 1458 } 1459 1460 if channelz.IsOn() { 1461 if err != nil && err != io.EOF { 1462 s.incrCallsFailed() 1463 } else { 1464 s.incrCallsSucceeded() 1465 } 1466 } 1467 }() 1468 } 1469 1470 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1471 if ss.binlog != nil { 1472 md, _ := metadata.FromIncomingContext(ctx) 1473 logEntry := &binarylog.ClientHeader{ 1474 Header: md, 1475 MethodName: stream.Method(), 1476 PeerAddr: nil, 1477 } 1478 if deadline, ok := ctx.Deadline(); ok { 1479 logEntry.Timeout = time.Until(deadline) 1480 if logEntry.Timeout < 0 { 1481 logEntry.Timeout = 0 1482 } 1483 } 1484 if a := md[":authority"]; len(a) > 0 { 1485 logEntry.Authority = a[0] 1486 } 1487 if peer, ok := peer.FromContext(ss.Context()); ok { 1488 logEntry.PeerAddr = peer.Addr 1489 } 1490 ss.binlog.Log(logEntry) 1491 } 1492 1493 // If dc is set and matches the stream's compression, use it. Otherwise, try 1494 // to find a matching registered compressor for decomp. 1495 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1496 ss.dc = s.opts.dc 1497 } else if rc != "" && rc != encoding.Identity { 1498 ss.decomp = encoding.GetCompressor(rc) 1499 if ss.decomp == nil { 1500 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1501 t.WriteStatus(ss.s, st) 1502 return st.Err() 1503 } 1504 } 1505 1506 // If cp is set, use it. Otherwise, attempt to compress the response using 1507 // the incoming message compression method. 1508 // 1509 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1510 if s.opts.cp != nil { 1511 ss.cp = s.opts.cp 1512 stream.SetSendCompress(s.opts.cp.Type()) 1513 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1514 // Legacy compressor not specified; attempt to respond with same encoding. 1515 ss.comp = encoding.GetCompressor(rc) 1516 if ss.comp != nil { 1517 stream.SetSendCompress(rc) 1518 } 1519 } 1520 1521 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp) 1522 1523 if trInfo != nil { 1524 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1525 } 1526 var appErr error 1527 var server interface{} 1528 if info != nil { 1529 server = info.serviceImpl 1530 } 1531 if s.opts.streamInt == nil { 1532 appErr = sd.Handler(server, ss) 1533 } else { 1534 info := &StreamServerInfo{ 1535 FullMethod: stream.Method(), 1536 IsClientStream: sd.ClientStreams, 1537 IsServerStream: sd.ServerStreams, 1538 } 1539 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1540 } 1541 if appErr != nil { 1542 appStatus, ok := status.FromError(appErr) 1543 if !ok { 1544 appStatus = status.New(codes.Unknown, appErr.Error()) 1545 appErr = appStatus.Err() 1546 } 1547 if trInfo != nil { 1548 ss.mu.Lock() 1549 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1550 ss.trInfo.tr.SetError() 1551 ss.mu.Unlock() 1552 } 1553 t.WriteStatus(ss.s, appStatus) 1554 if ss.binlog != nil { 1555 ss.binlog.Log(&binarylog.ServerTrailer{ 1556 Trailer: ss.s.Trailer(), 1557 Err: appErr, 1558 }) 1559 } 1560 // TODO: Should we log an error from WriteStatus here and below? 1561 return appErr 1562 } 1563 if trInfo != nil { 1564 ss.mu.Lock() 1565 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1566 ss.mu.Unlock() 1567 } 1568 err = t.WriteStatus(ss.s, statusOK) 1569 if ss.binlog != nil { 1570 ss.binlog.Log(&binarylog.ServerTrailer{ 1571 Trailer: ss.s.Trailer(), 1572 Err: appErr, 1573 }) 1574 } 1575 return err 1576} 1577 1578func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1579 sm := stream.Method() 1580 if sm != "" && sm[0] == '/' { 1581 sm = sm[1:] 1582 } 1583 pos := strings.LastIndex(sm, "/") 1584 if pos == -1 { 1585 if trInfo != nil { 1586 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1587 trInfo.tr.SetError() 1588 } 1589 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1590 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1591 if trInfo != nil { 1592 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1593 trInfo.tr.SetError() 1594 } 1595 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1596 } 1597 if trInfo != nil { 1598 trInfo.tr.Finish() 1599 } 1600 return 1601 } 1602 service := sm[:pos] 1603 method := sm[pos+1:] 1604 1605 srv, knownService := s.services[service] 1606 if knownService { 1607 if md, ok := srv.methods[method]; ok { 1608 s.processUnaryRPC(t, stream, srv, md, trInfo) 1609 return 1610 } 1611 if sd, ok := srv.streams[method]; ok { 1612 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1613 return 1614 } 1615 } 1616 // Unknown service, or known server unknown method. 1617 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1618 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1619 return 1620 } 1621 var errDesc string 1622 if !knownService { 1623 errDesc = fmt.Sprintf("unknown service %v", service) 1624 } else { 1625 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1626 } 1627 if trInfo != nil { 1628 trInfo.tr.LazyPrintf("%s", errDesc) 1629 trInfo.tr.SetError() 1630 } 1631 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1632 if trInfo != nil { 1633 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1634 trInfo.tr.SetError() 1635 } 1636 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1637 } 1638 if trInfo != nil { 1639 trInfo.tr.Finish() 1640 } 1641} 1642 1643// The key to save ServerTransportStream in the context. 1644type streamKey struct{} 1645 1646// NewContextWithServerTransportStream creates a new context from ctx and 1647// attaches stream to it. 1648// 1649// Experimental 1650// 1651// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1652// later release. 1653func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1654 return context.WithValue(ctx, streamKey{}, stream) 1655} 1656 1657// ServerTransportStream is a minimal interface that a transport stream must 1658// implement. This can be used to mock an actual transport stream for tests of 1659// handler code that use, for example, grpc.SetHeader (which requires some 1660// stream to be in context). 1661// 1662// See also NewContextWithServerTransportStream. 1663// 1664// Experimental 1665// 1666// Notice: This type is EXPERIMENTAL and may be changed or removed in a 1667// later release. 1668type ServerTransportStream interface { 1669 Method() string 1670 SetHeader(md metadata.MD) error 1671 SendHeader(md metadata.MD) error 1672 SetTrailer(md metadata.MD) error 1673} 1674 1675// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1676// ctx. Returns nil if the given context has no stream associated with it 1677// (which implies it is not an RPC invocation context). 1678// 1679// Experimental 1680// 1681// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1682// later release. 1683func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1684 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1685 return s 1686} 1687 1688// Stop stops the gRPC server. It immediately closes all open 1689// connections and listeners. 1690// It cancels all active RPCs on the server side and the corresponding 1691// pending RPCs on the client side will get notified by connection 1692// errors. 1693func (s *Server) Stop() { 1694 s.quit.Fire() 1695 1696 defer func() { 1697 s.serveWG.Wait() 1698 s.done.Fire() 1699 }() 1700 1701 s.channelzRemoveOnce.Do(func() { 1702 if channelz.IsOn() { 1703 channelz.RemoveEntry(s.channelzID) 1704 } 1705 }) 1706 1707 s.mu.Lock() 1708 listeners := s.lis 1709 s.lis = nil 1710 conns := s.conns 1711 s.conns = nil 1712 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1713 s.cv.Broadcast() 1714 s.mu.Unlock() 1715 1716 for lis := range listeners { 1717 lis.Close() 1718 } 1719 for _, cs := range conns { 1720 for st := range cs { 1721 st.Close() 1722 } 1723 } 1724 if s.opts.numServerWorkers > 0 { 1725 s.stopServerWorkers() 1726 } 1727 1728 s.mu.Lock() 1729 if s.events != nil { 1730 s.events.Finish() 1731 s.events = nil 1732 } 1733 s.mu.Unlock() 1734} 1735 1736// GracefulStop stops the gRPC server gracefully. It stops the server from 1737// accepting new connections and RPCs and blocks until all the pending RPCs are 1738// finished. 1739func (s *Server) GracefulStop() { 1740 s.quit.Fire() 1741 defer s.done.Fire() 1742 1743 s.channelzRemoveOnce.Do(func() { 1744 if channelz.IsOn() { 1745 channelz.RemoveEntry(s.channelzID) 1746 } 1747 }) 1748 s.mu.Lock() 1749 if s.conns == nil { 1750 s.mu.Unlock() 1751 return 1752 } 1753 1754 for lis := range s.lis { 1755 lis.Close() 1756 } 1757 s.lis = nil 1758 if !s.drain { 1759 for _, conns := range s.conns { 1760 for st := range conns { 1761 st.Drain() 1762 } 1763 } 1764 s.drain = true 1765 } 1766 1767 // Wait for serving threads to be ready to exit. Only then can we be sure no 1768 // new conns will be created. 1769 s.mu.Unlock() 1770 s.serveWG.Wait() 1771 s.mu.Lock() 1772 1773 for len(s.conns) != 0 { 1774 s.cv.Wait() 1775 } 1776 s.conns = nil 1777 if s.events != nil { 1778 s.events.Finish() 1779 s.events = nil 1780 } 1781 s.mu.Unlock() 1782} 1783 1784// contentSubtype must be lowercase 1785// cannot return nil 1786func (s *Server) getCodec(contentSubtype string) baseCodec { 1787 if s.opts.codec != nil { 1788 return s.opts.codec 1789 } 1790 if contentSubtype == "" { 1791 return encoding.GetCodec(proto.Name) 1792 } 1793 codec := encoding.GetCodec(contentSubtype) 1794 if codec == nil { 1795 return encoding.GetCodec(proto.Name) 1796 } 1797 return codec 1798} 1799 1800// SetHeader sets the header metadata. 1801// When called multiple times, all the provided metadata will be merged. 1802// All the metadata will be sent out when one of the following happens: 1803// - grpc.SendHeader() is called; 1804// - The first response is sent out; 1805// - An RPC status is sent out (error or success). 1806func SetHeader(ctx context.Context, md metadata.MD) error { 1807 if md.Len() == 0 { 1808 return nil 1809 } 1810 stream := ServerTransportStreamFromContext(ctx) 1811 if stream == nil { 1812 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1813 } 1814 return stream.SetHeader(md) 1815} 1816 1817// SendHeader sends header metadata. It may be called at most once. 1818// The provided md and headers set by SetHeader() will be sent. 1819func SendHeader(ctx context.Context, md metadata.MD) error { 1820 stream := ServerTransportStreamFromContext(ctx) 1821 if stream == nil { 1822 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1823 } 1824 if err := stream.SendHeader(md); err != nil { 1825 return toRPCErr(err) 1826 } 1827 return nil 1828} 1829 1830// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1831// When called more than once, all the provided metadata will be merged. 1832func SetTrailer(ctx context.Context, md metadata.MD) error { 1833 if md.Len() == 0 { 1834 return nil 1835 } 1836 stream := ServerTransportStreamFromContext(ctx) 1837 if stream == nil { 1838 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1839 } 1840 return stream.SetTrailer(md) 1841} 1842 1843// Method returns the method string for the server context. The returned 1844// string is in the format of "/service/method". 1845func Method(ctx context.Context) (string, bool) { 1846 s := ServerTransportStreamFromContext(ctx) 1847 if s == nil { 1848 return "", false 1849 } 1850 return s.Method(), true 1851} 1852 1853type channelzServer struct { 1854 s *Server 1855} 1856 1857func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1858 return c.s.channelzMetric() 1859} 1860