1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal" 44 "google.golang.org/grpc/internal/binarylog" 45 "google.golang.org/grpc/internal/channelz" 46 "google.golang.org/grpc/internal/grpcrand" 47 "google.golang.org/grpc/internal/grpcsync" 48 "google.golang.org/grpc/internal/transport" 49 "google.golang.org/grpc/keepalive" 50 "google.golang.org/grpc/metadata" 51 "google.golang.org/grpc/peer" 52 "google.golang.org/grpc/stats" 53 "google.golang.org/grpc/status" 54 "google.golang.org/grpc/tap" 55) 56 57const ( 58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 59 defaultServerMaxSendMessageSize = math.MaxInt32 60 61 // Server transports are tracked in a map which is keyed on listener 62 // address. For regular gRPC traffic, connections are accepted in Serve() 63 // through a call to Accept(), and we use the actual listener address as key 64 // when we add it to the map. But for connections received through 65 // ServeHTTP(), we do not have a listener and hence use this dummy value. 66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP" 67) 68 69func init() { 70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { 71 return srv.opts.creds 72 } 73 internal.DrainServerTransports = func(srv *Server, addr string) { 74 srv.drainServerTransports(addr) 75 } 76} 77 78var statusOK = status.New(codes.OK, "") 79var logger = grpclog.Component("core") 80 81type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 82 83// MethodDesc represents an RPC service's method specification. 84type MethodDesc struct { 85 MethodName string 86 Handler methodHandler 87} 88 89// ServiceDesc represents an RPC service's specification. 90type ServiceDesc struct { 91 ServiceName string 92 // The pointer to the service interface. Used to check whether the user 93 // provided implementation satisfies the interface requirements. 94 HandlerType interface{} 95 Methods []MethodDesc 96 Streams []StreamDesc 97 Metadata interface{} 98} 99 100// serviceInfo wraps information about a service. It is very similar to 101// ServiceDesc and is constructed from it for internal purposes. 102type serviceInfo struct { 103 // Contains the implementation for the methods in this service. 104 serviceImpl interface{} 105 methods map[string]*MethodDesc 106 streams map[string]*StreamDesc 107 mdata interface{} 108} 109 110type serverWorkerData struct { 111 st transport.ServerTransport 112 wg *sync.WaitGroup 113 stream *transport.Stream 114} 115 116// Server is a gRPC server to serve RPC requests. 117type Server struct { 118 opts serverOptions 119 120 mu sync.Mutex // guards following 121 lis map[net.Listener]bool 122 // conns contains all active server transports. It is a map keyed on a 123 // listener address with the value being the set of active transports 124 // belonging to that listener. 125 conns map[string]map[transport.ServerTransport]bool 126 serve bool 127 drain bool 128 cv *sync.Cond // signaled when connections close for GracefulStop 129 services map[string]*serviceInfo // service name -> service info 130 events trace.EventLog 131 132 quit *grpcsync.Event 133 done *grpcsync.Event 134 channelzRemoveOnce sync.Once 135 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 136 137 channelzID int64 // channelz unique identification number 138 czData *channelzData 139 140 serverWorkerChannels []chan *serverWorkerData 141} 142 143type serverOptions struct { 144 creds credentials.TransportCredentials 145 codec baseCodec 146 cp Compressor 147 dc Decompressor 148 unaryInt UnaryServerInterceptor 149 streamInt StreamServerInterceptor 150 chainUnaryInts []UnaryServerInterceptor 151 chainStreamInts []StreamServerInterceptor 152 inTapHandle tap.ServerInHandle 153 statsHandler stats.Handler 154 maxConcurrentStreams uint32 155 maxReceiveMessageSize int 156 maxSendMessageSize int 157 unknownStreamDesc *StreamDesc 158 keepaliveParams keepalive.ServerParameters 159 keepalivePolicy keepalive.EnforcementPolicy 160 initialWindowSize int32 161 initialConnWindowSize int32 162 writeBufferSize int 163 readBufferSize int 164 connectionTimeout time.Duration 165 maxHeaderListSize *uint32 166 headerTableSize *uint32 167 numServerWorkers uint32 168} 169 170var defaultServerOptions = serverOptions{ 171 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 172 maxSendMessageSize: defaultServerMaxSendMessageSize, 173 connectionTimeout: 120 * time.Second, 174 writeBufferSize: defaultWriteBufSize, 175 readBufferSize: defaultReadBufSize, 176} 177 178// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 179type ServerOption interface { 180 apply(*serverOptions) 181} 182 183// EmptyServerOption does not alter the server configuration. It can be embedded 184// in another structure to build custom server options. 185// 186// Experimental 187// 188// Notice: This type is EXPERIMENTAL and may be changed or removed in a 189// later release. 190type EmptyServerOption struct{} 191 192func (EmptyServerOption) apply(*serverOptions) {} 193 194// funcServerOption wraps a function that modifies serverOptions into an 195// implementation of the ServerOption interface. 196type funcServerOption struct { 197 f func(*serverOptions) 198} 199 200func (fdo *funcServerOption) apply(do *serverOptions) { 201 fdo.f(do) 202} 203 204func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 205 return &funcServerOption{ 206 f: f, 207 } 208} 209 210// WriteBufferSize determines how much data can be batched before doing a write on the wire. 211// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 212// The default value for this buffer is 32KB. 213// Zero will disable the write buffer such that each write will be on underlying connection. 214// Note: A Send call may not directly translate to a write. 215func WriteBufferSize(s int) ServerOption { 216 return newFuncServerOption(func(o *serverOptions) { 217 o.writeBufferSize = s 218 }) 219} 220 221// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 222// for one read syscall. 223// The default value for this buffer is 32KB. 224// Zero will disable read buffer for a connection so data framer can access the underlying 225// conn directly. 226func ReadBufferSize(s int) ServerOption { 227 return newFuncServerOption(func(o *serverOptions) { 228 o.readBufferSize = s 229 }) 230} 231 232// InitialWindowSize returns a ServerOption that sets window size for stream. 233// The lower bound for window size is 64K and any value smaller than that will be ignored. 234func InitialWindowSize(s int32) ServerOption { 235 return newFuncServerOption(func(o *serverOptions) { 236 o.initialWindowSize = s 237 }) 238} 239 240// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 241// The lower bound for window size is 64K and any value smaller than that will be ignored. 242func InitialConnWindowSize(s int32) ServerOption { 243 return newFuncServerOption(func(o *serverOptions) { 244 o.initialConnWindowSize = s 245 }) 246} 247 248// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 249func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 250 if kp.Time > 0 && kp.Time < time.Second { 251 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") 252 kp.Time = time.Second 253 } 254 255 return newFuncServerOption(func(o *serverOptions) { 256 o.keepaliveParams = kp 257 }) 258} 259 260// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 261func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 262 return newFuncServerOption(func(o *serverOptions) { 263 o.keepalivePolicy = kep 264 }) 265} 266 267// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 268// 269// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 270// 271// Deprecated: register codecs using encoding.RegisterCodec. The server will 272// automatically use registered codecs based on the incoming requests' headers. 273// See also 274// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 275// Will be supported throughout 1.x. 276func CustomCodec(codec Codec) ServerOption { 277 return newFuncServerOption(func(o *serverOptions) { 278 o.codec = codec 279 }) 280} 281 282// ForceServerCodec returns a ServerOption that sets a codec for message 283// marshaling and unmarshaling. 284// 285// This will override any lookups by content-subtype for Codecs registered 286// with RegisterCodec. 287// 288// See Content-Type on 289// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 290// more details. Also see the documentation on RegisterCodec and 291// CallContentSubtype for more details on the interaction between encoding.Codec 292// and content-subtype. 293// 294// This function is provided for advanced users; prefer to register codecs 295// using encoding.RegisterCodec. 296// The server will automatically use registered codecs based on the incoming 297// requests' headers. See also 298// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 299// Will be supported throughout 1.x. 300// 301// Experimental 302// 303// Notice: This API is EXPERIMENTAL and may be changed or removed in a 304// later release. 305func ForceServerCodec(codec encoding.Codec) ServerOption { 306 return newFuncServerOption(func(o *serverOptions) { 307 o.codec = codec 308 }) 309} 310 311// RPCCompressor returns a ServerOption that sets a compressor for outbound 312// messages. For backward compatibility, all outbound messages will be sent 313// using this compressor, regardless of incoming message compression. By 314// default, server messages will be sent using the same compressor with which 315// request messages were sent. 316// 317// Deprecated: use encoding.RegisterCompressor instead. Will be supported 318// throughout 1.x. 319func RPCCompressor(cp Compressor) ServerOption { 320 return newFuncServerOption(func(o *serverOptions) { 321 o.cp = cp 322 }) 323} 324 325// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 326// messages. It has higher priority than decompressors registered via 327// encoding.RegisterCompressor. 328// 329// Deprecated: use encoding.RegisterCompressor instead. Will be supported 330// throughout 1.x. 331func RPCDecompressor(dc Decompressor) ServerOption { 332 return newFuncServerOption(func(o *serverOptions) { 333 o.dc = dc 334 }) 335} 336 337// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 338// If this is not set, gRPC uses the default limit. 339// 340// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. 341func MaxMsgSize(m int) ServerOption { 342 return MaxRecvMsgSize(m) 343} 344 345// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 346// If this is not set, gRPC uses the default 4MB. 347func MaxRecvMsgSize(m int) ServerOption { 348 return newFuncServerOption(func(o *serverOptions) { 349 o.maxReceiveMessageSize = m 350 }) 351} 352 353// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 354// If this is not set, gRPC uses the default `math.MaxInt32`. 355func MaxSendMsgSize(m int) ServerOption { 356 return newFuncServerOption(func(o *serverOptions) { 357 o.maxSendMessageSize = m 358 }) 359} 360 361// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 362// of concurrent streams to each ServerTransport. 363func MaxConcurrentStreams(n uint32) ServerOption { 364 return newFuncServerOption(func(o *serverOptions) { 365 o.maxConcurrentStreams = n 366 }) 367} 368 369// Creds returns a ServerOption that sets credentials for server connections. 370func Creds(c credentials.TransportCredentials) ServerOption { 371 return newFuncServerOption(func(o *serverOptions) { 372 o.creds = c 373 }) 374} 375 376// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 377// server. Only one unary interceptor can be installed. The construction of multiple 378// interceptors (e.g., chaining) can be implemented at the caller. 379func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 380 return newFuncServerOption(func(o *serverOptions) { 381 if o.unaryInt != nil { 382 panic("The unary server interceptor was already set and may not be reset.") 383 } 384 o.unaryInt = i 385 }) 386} 387 388// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 389// for unary RPCs. The first interceptor will be the outer most, 390// while the last interceptor will be the inner most wrapper around the real call. 391// All unary interceptors added by this method will be chained. 392func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 393 return newFuncServerOption(func(o *serverOptions) { 394 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 395 }) 396} 397 398// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 399// server. Only one stream interceptor can be installed. 400func StreamInterceptor(i StreamServerInterceptor) ServerOption { 401 return newFuncServerOption(func(o *serverOptions) { 402 if o.streamInt != nil { 403 panic("The stream server interceptor was already set and may not be reset.") 404 } 405 o.streamInt = i 406 }) 407} 408 409// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 410// for streaming RPCs. The first interceptor will be the outer most, 411// while the last interceptor will be the inner most wrapper around the real call. 412// All stream interceptors added by this method will be chained. 413func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 414 return newFuncServerOption(func(o *serverOptions) { 415 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 416 }) 417} 418 419// InTapHandle returns a ServerOption that sets the tap handle for all the server 420// transport to be created. Only one can be installed. 421// 422// Experimental 423// 424// Notice: This API is EXPERIMENTAL and may be changed or removed in a 425// later release. 426func InTapHandle(h tap.ServerInHandle) ServerOption { 427 return newFuncServerOption(func(o *serverOptions) { 428 if o.inTapHandle != nil { 429 panic("The tap handle was already set and may not be reset.") 430 } 431 o.inTapHandle = h 432 }) 433} 434 435// StatsHandler returns a ServerOption that sets the stats handler for the server. 436func StatsHandler(h stats.Handler) ServerOption { 437 return newFuncServerOption(func(o *serverOptions) { 438 o.statsHandler = h 439 }) 440} 441 442// UnknownServiceHandler returns a ServerOption that allows for adding a custom 443// unknown service handler. The provided method is a bidi-streaming RPC service 444// handler that will be invoked instead of returning the "unimplemented" gRPC 445// error whenever a request is received for an unregistered service or method. 446// The handling function and stream interceptor (if set) have full access to 447// the ServerStream, including its Context. 448func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 449 return newFuncServerOption(func(o *serverOptions) { 450 o.unknownStreamDesc = &StreamDesc{ 451 StreamName: "unknown_service_handler", 452 Handler: streamHandler, 453 // We need to assume that the users of the streamHandler will want to use both. 454 ClientStreams: true, 455 ServerStreams: true, 456 } 457 }) 458} 459 460// ConnectionTimeout returns a ServerOption that sets the timeout for 461// connection establishment (up to and including HTTP/2 handshaking) for all 462// new connections. If this is not set, the default is 120 seconds. A zero or 463// negative value will result in an immediate timeout. 464// 465// Experimental 466// 467// Notice: This API is EXPERIMENTAL and may be changed or removed in a 468// later release. 469func ConnectionTimeout(d time.Duration) ServerOption { 470 return newFuncServerOption(func(o *serverOptions) { 471 o.connectionTimeout = d 472 }) 473} 474 475// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 476// of header list that the server is prepared to accept. 477func MaxHeaderListSize(s uint32) ServerOption { 478 return newFuncServerOption(func(o *serverOptions) { 479 o.maxHeaderListSize = &s 480 }) 481} 482 483// HeaderTableSize returns a ServerOption that sets the size of dynamic 484// header table for stream. 485// 486// Experimental 487// 488// Notice: This API is EXPERIMENTAL and may be changed or removed in a 489// later release. 490func HeaderTableSize(s uint32) ServerOption { 491 return newFuncServerOption(func(o *serverOptions) { 492 o.headerTableSize = &s 493 }) 494} 495 496// NumStreamWorkers returns a ServerOption that sets the number of worker 497// goroutines that should be used to process incoming streams. Setting this to 498// zero (default) will disable workers and spawn a new goroutine for each 499// stream. 500// 501// Experimental 502// 503// Notice: This API is EXPERIMENTAL and may be changed or removed in a 504// later release. 505func NumStreamWorkers(numServerWorkers uint32) ServerOption { 506 // TODO: If/when this API gets stabilized (i.e. stream workers become the 507 // only way streams are processed), change the behavior of the zero value to 508 // a sane default. Preliminary experiments suggest that a value equal to the 509 // number of CPUs available is most performant; requires thorough testing. 510 return newFuncServerOption(func(o *serverOptions) { 511 o.numServerWorkers = numServerWorkers 512 }) 513} 514 515// serverWorkerResetThreshold defines how often the stack must be reset. Every 516// N requests, by spawning a new goroutine in its place, a worker can reset its 517// stack so that large stacks don't live in memory forever. 2^16 should allow 518// each goroutine stack to live for at least a few seconds in a typical 519// workload (assuming a QPS of a few thousand requests/sec). 520const serverWorkerResetThreshold = 1 << 16 521 522// serverWorkers blocks on a *transport.Stream channel forever and waits for 523// data to be fed by serveStreams. This allows different requests to be 524// processed by the same goroutine, removing the need for expensive stack 525// re-allocations (see the runtime.morestack problem [1]). 526// 527// [1] https://github.com/golang/go/issues/18138 528func (s *Server) serverWorker(ch chan *serverWorkerData) { 529 // To make sure all server workers don't reset at the same time, choose a 530 // random number of iterations before resetting. 531 threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) 532 for completed := 0; completed < threshold; completed++ { 533 data, ok := <-ch 534 if !ok { 535 return 536 } 537 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) 538 data.wg.Done() 539 } 540 go s.serverWorker(ch) 541} 542 543// initServerWorkers creates worker goroutines and channels to process incoming 544// connections to reduce the time spent overall on runtime.morestack. 545func (s *Server) initServerWorkers() { 546 s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) 547 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 548 s.serverWorkerChannels[i] = make(chan *serverWorkerData) 549 go s.serverWorker(s.serverWorkerChannels[i]) 550 } 551} 552 553func (s *Server) stopServerWorkers() { 554 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 555 close(s.serverWorkerChannels[i]) 556 } 557} 558 559// NewServer creates a gRPC server which has no service registered and has not 560// started to accept requests yet. 561func NewServer(opt ...ServerOption) *Server { 562 opts := defaultServerOptions 563 for _, o := range opt { 564 o.apply(&opts) 565 } 566 s := &Server{ 567 lis: make(map[net.Listener]bool), 568 opts: opts, 569 conns: make(map[string]map[transport.ServerTransport]bool), 570 services: make(map[string]*serviceInfo), 571 quit: grpcsync.NewEvent(), 572 done: grpcsync.NewEvent(), 573 czData: new(channelzData), 574 } 575 chainUnaryServerInterceptors(s) 576 chainStreamServerInterceptors(s) 577 s.cv = sync.NewCond(&s.mu) 578 if EnableTracing { 579 _, file, line, _ := runtime.Caller(1) 580 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 581 } 582 583 if s.opts.numServerWorkers > 0 { 584 s.initServerWorkers() 585 } 586 587 if channelz.IsOn() { 588 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 589 } 590 return s 591} 592 593// printf records an event in s's event log, unless s has been stopped. 594// REQUIRES s.mu is held. 595func (s *Server) printf(format string, a ...interface{}) { 596 if s.events != nil { 597 s.events.Printf(format, a...) 598 } 599} 600 601// errorf records an error in s's event log, unless s has been stopped. 602// REQUIRES s.mu is held. 603func (s *Server) errorf(format string, a ...interface{}) { 604 if s.events != nil { 605 s.events.Errorf(format, a...) 606 } 607} 608 609// ServiceRegistrar wraps a single method that supports service registration. It 610// enables users to pass concrete types other than grpc.Server to the service 611// registration methods exported by the IDL generated code. 612type ServiceRegistrar interface { 613 // RegisterService registers a service and its implementation to the 614 // concrete type implementing this interface. It may not be called 615 // once the server has started serving. 616 // desc describes the service and its methods and handlers. impl is the 617 // service implementation which is passed to the method handlers. 618 RegisterService(desc *ServiceDesc, impl interface{}) 619} 620 621// RegisterService registers a service and its implementation to the gRPC 622// server. It is called from the IDL generated code. This must be called before 623// invoking Serve. If ss is non-nil (for legacy code), its type is checked to 624// ensure it implements sd.HandlerType. 625func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 626 if ss != nil { 627 ht := reflect.TypeOf(sd.HandlerType).Elem() 628 st := reflect.TypeOf(ss) 629 if !st.Implements(ht) { 630 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 631 } 632 } 633 s.register(sd, ss) 634} 635 636func (s *Server) register(sd *ServiceDesc, ss interface{}) { 637 s.mu.Lock() 638 defer s.mu.Unlock() 639 s.printf("RegisterService(%q)", sd.ServiceName) 640 if s.serve { 641 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 642 } 643 if _, ok := s.services[sd.ServiceName]; ok { 644 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 645 } 646 info := &serviceInfo{ 647 serviceImpl: ss, 648 methods: make(map[string]*MethodDesc), 649 streams: make(map[string]*StreamDesc), 650 mdata: sd.Metadata, 651 } 652 for i := range sd.Methods { 653 d := &sd.Methods[i] 654 info.methods[d.MethodName] = d 655 } 656 for i := range sd.Streams { 657 d := &sd.Streams[i] 658 info.streams[d.StreamName] = d 659 } 660 s.services[sd.ServiceName] = info 661} 662 663// MethodInfo contains the information of an RPC including its method name and type. 664type MethodInfo struct { 665 // Name is the method name only, without the service name or package name. 666 Name string 667 // IsClientStream indicates whether the RPC is a client streaming RPC. 668 IsClientStream bool 669 // IsServerStream indicates whether the RPC is a server streaming RPC. 670 IsServerStream bool 671} 672 673// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 674type ServiceInfo struct { 675 Methods []MethodInfo 676 // Metadata is the metadata specified in ServiceDesc when registering service. 677 Metadata interface{} 678} 679 680// GetServiceInfo returns a map from service names to ServiceInfo. 681// Service names include the package names, in the form of <package>.<service>. 682func (s *Server) GetServiceInfo() map[string]ServiceInfo { 683 ret := make(map[string]ServiceInfo) 684 for n, srv := range s.services { 685 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams)) 686 for m := range srv.methods { 687 methods = append(methods, MethodInfo{ 688 Name: m, 689 IsClientStream: false, 690 IsServerStream: false, 691 }) 692 } 693 for m, d := range srv.streams { 694 methods = append(methods, MethodInfo{ 695 Name: m, 696 IsClientStream: d.ClientStreams, 697 IsServerStream: d.ServerStreams, 698 }) 699 } 700 701 ret[n] = ServiceInfo{ 702 Methods: methods, 703 Metadata: srv.mdata, 704 } 705 } 706 return ret 707} 708 709// ErrServerStopped indicates that the operation is now illegal because of 710// the server being stopped. 711var ErrServerStopped = errors.New("grpc: the server has been stopped") 712 713type listenSocket struct { 714 net.Listener 715 channelzID int64 716} 717 718func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 719 return &channelz.SocketInternalMetric{ 720 SocketOptions: channelz.GetSocketOption(l.Listener), 721 LocalAddr: l.Listener.Addr(), 722 } 723} 724 725func (l *listenSocket) Close() error { 726 err := l.Listener.Close() 727 if channelz.IsOn() { 728 channelz.RemoveEntry(l.channelzID) 729 } 730 return err 731} 732 733// Serve accepts incoming connections on the listener lis, creating a new 734// ServerTransport and service goroutine for each. The service goroutines 735// read gRPC requests and then call the registered handlers to reply to them. 736// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 737// this method returns. 738// Serve will return a non-nil error unless Stop or GracefulStop is called. 739func (s *Server) Serve(lis net.Listener) error { 740 s.mu.Lock() 741 s.printf("serving") 742 s.serve = true 743 if s.lis == nil { 744 // Serve called after Stop or GracefulStop. 745 s.mu.Unlock() 746 lis.Close() 747 return ErrServerStopped 748 } 749 750 s.serveWG.Add(1) 751 defer func() { 752 s.serveWG.Done() 753 if s.quit.HasFired() { 754 // Stop or GracefulStop called; block until done and return nil. 755 <-s.done.Done() 756 } 757 }() 758 759 ls := &listenSocket{Listener: lis} 760 s.lis[ls] = true 761 762 if channelz.IsOn() { 763 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 764 } 765 s.mu.Unlock() 766 767 defer func() { 768 s.mu.Lock() 769 if s.lis != nil && s.lis[ls] { 770 ls.Close() 771 delete(s.lis, ls) 772 } 773 s.mu.Unlock() 774 }() 775 776 var tempDelay time.Duration // how long to sleep on accept failure 777 778 for { 779 rawConn, err := lis.Accept() 780 if err != nil { 781 if ne, ok := err.(interface { 782 Temporary() bool 783 }); ok && ne.Temporary() { 784 if tempDelay == 0 { 785 tempDelay = 5 * time.Millisecond 786 } else { 787 tempDelay *= 2 788 } 789 if max := 1 * time.Second; tempDelay > max { 790 tempDelay = max 791 } 792 s.mu.Lock() 793 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 794 s.mu.Unlock() 795 timer := time.NewTimer(tempDelay) 796 select { 797 case <-timer.C: 798 case <-s.quit.Done(): 799 timer.Stop() 800 return nil 801 } 802 continue 803 } 804 s.mu.Lock() 805 s.printf("done serving; Accept = %v", err) 806 s.mu.Unlock() 807 808 if s.quit.HasFired() { 809 return nil 810 } 811 return err 812 } 813 tempDelay = 0 814 // Start a new goroutine to deal with rawConn so we don't stall this Accept 815 // loop goroutine. 816 // 817 // Make sure we account for the goroutine so GracefulStop doesn't nil out 818 // s.conns before this conn can be added. 819 s.serveWG.Add(1) 820 go func() { 821 s.handleRawConn(lis.Addr().String(), rawConn) 822 s.serveWG.Done() 823 }() 824 } 825} 826 827// handleRawConn forks a goroutine to handle a just-accepted connection that 828// has not had any I/O performed on it yet. 829func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { 830 if s.quit.HasFired() { 831 rawConn.Close() 832 return 833 } 834 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 835 836 // Finish handshaking (HTTP2) 837 st := s.newHTTP2Transport(rawConn) 838 rawConn.SetDeadline(time.Time{}) 839 if st == nil { 840 return 841 } 842 843 if !s.addConn(lisAddr, st) { 844 return 845 } 846 go func() { 847 s.serveStreams(st) 848 s.removeConn(lisAddr, st) 849 }() 850} 851 852func (s *Server) drainServerTransports(addr string) { 853 s.mu.Lock() 854 conns := s.conns[addr] 855 for st := range conns { 856 st.Drain() 857 } 858 s.mu.Unlock() 859} 860 861// newHTTP2Transport sets up a http/2 transport (using the 862// gRPC http2 server transport in transport/http2_server.go). 863func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { 864 config := &transport.ServerConfig{ 865 MaxStreams: s.opts.maxConcurrentStreams, 866 ConnectionTimeout: s.opts.connectionTimeout, 867 Credentials: s.opts.creds, 868 InTapHandle: s.opts.inTapHandle, 869 StatsHandler: s.opts.statsHandler, 870 KeepaliveParams: s.opts.keepaliveParams, 871 KeepalivePolicy: s.opts.keepalivePolicy, 872 InitialWindowSize: s.opts.initialWindowSize, 873 InitialConnWindowSize: s.opts.initialConnWindowSize, 874 WriteBufferSize: s.opts.writeBufferSize, 875 ReadBufferSize: s.opts.readBufferSize, 876 ChannelzParentID: s.channelzID, 877 MaxHeaderListSize: s.opts.maxHeaderListSize, 878 HeaderTableSize: s.opts.headerTableSize, 879 } 880 st, err := transport.NewServerTransport(c, config) 881 if err != nil { 882 s.mu.Lock() 883 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 884 s.mu.Unlock() 885 // ErrConnDispatched means that the connection was dispatched away from 886 // gRPC; those connections should be left open. 887 if err != credentials.ErrConnDispatched { 888 // Don't log on ErrConnDispatched and io.EOF to prevent log spam. 889 if err != io.EOF { 890 channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 891 } 892 c.Close() 893 } 894 return nil 895 } 896 897 return st 898} 899 900func (s *Server) serveStreams(st transport.ServerTransport) { 901 defer st.Close() 902 var wg sync.WaitGroup 903 904 var roundRobinCounter uint32 905 st.HandleStreams(func(stream *transport.Stream) { 906 wg.Add(1) 907 if s.opts.numServerWorkers > 0 { 908 data := &serverWorkerData{st: st, wg: &wg, stream: stream} 909 select { 910 case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: 911 default: 912 // If all stream workers are busy, fallback to the default code path. 913 go func() { 914 s.handleStream(st, stream, s.traceInfo(st, stream)) 915 wg.Done() 916 }() 917 } 918 } else { 919 go func() { 920 defer wg.Done() 921 s.handleStream(st, stream, s.traceInfo(st, stream)) 922 }() 923 } 924 }, func(ctx context.Context, method string) context.Context { 925 if !EnableTracing { 926 return ctx 927 } 928 tr := trace.New("grpc.Recv."+methodFamily(method), method) 929 return trace.NewContext(ctx, tr) 930 }) 931 wg.Wait() 932} 933 934var _ http.Handler = (*Server)(nil) 935 936// ServeHTTP implements the Go standard library's http.Handler 937// interface by responding to the gRPC request r, by looking up 938// the requested gRPC method in the gRPC server s. 939// 940// The provided HTTP request must have arrived on an HTTP/2 941// connection. When using the Go standard library's server, 942// practically this means that the Request must also have arrived 943// over TLS. 944// 945// To share one port (such as 443 for https) between gRPC and an 946// existing http.Handler, use a root http.Handler such as: 947// 948// if r.ProtoMajor == 2 && strings.HasPrefix( 949// r.Header.Get("Content-Type"), "application/grpc") { 950// grpcServer.ServeHTTP(w, r) 951// } else { 952// yourMux.ServeHTTP(w, r) 953// } 954// 955// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 956// separate from grpc-go's HTTP/2 server. Performance and features may vary 957// between the two paths. ServeHTTP does not support some gRPC features 958// available through grpc-go's HTTP/2 server. 959// 960// Experimental 961// 962// Notice: This API is EXPERIMENTAL and may be changed or removed in a 963// later release. 964func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 965 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 966 if err != nil { 967 http.Error(w, err.Error(), http.StatusInternalServerError) 968 return 969 } 970 if !s.addConn(listenerAddressForServeHTTP, st) { 971 return 972 } 973 defer s.removeConn(listenerAddressForServeHTTP, st) 974 s.serveStreams(st) 975} 976 977// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 978// If tracing is not enabled, it returns nil. 979func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 980 if !EnableTracing { 981 return nil 982 } 983 tr, ok := trace.FromContext(stream.Context()) 984 if !ok { 985 return nil 986 } 987 988 trInfo = &traceInfo{ 989 tr: tr, 990 firstLine: firstLine{ 991 client: false, 992 remoteAddr: st.RemoteAddr(), 993 }, 994 } 995 if dl, ok := stream.Context().Deadline(); ok { 996 trInfo.firstLine.deadline = time.Until(dl) 997 } 998 return trInfo 999} 1000 1001func (s *Server) addConn(addr string, st transport.ServerTransport) bool { 1002 s.mu.Lock() 1003 defer s.mu.Unlock() 1004 if s.conns == nil { 1005 st.Close() 1006 return false 1007 } 1008 if s.drain { 1009 // Transport added after we drained our existing conns: drain it 1010 // immediately. 1011 st.Drain() 1012 } 1013 1014 if s.conns[addr] == nil { 1015 // Create a map entry if this is the first connection on this listener. 1016 s.conns[addr] = make(map[transport.ServerTransport]bool) 1017 } 1018 s.conns[addr][st] = true 1019 return true 1020} 1021 1022func (s *Server) removeConn(addr string, st transport.ServerTransport) { 1023 s.mu.Lock() 1024 defer s.mu.Unlock() 1025 1026 conns := s.conns[addr] 1027 if conns != nil { 1028 delete(conns, st) 1029 if len(conns) == 0 { 1030 // If the last connection for this address is being removed, also 1031 // remove the map entry corresponding to the address. This is used 1032 // in GracefulStop() when waiting for all connections to be closed. 1033 delete(s.conns, addr) 1034 } 1035 s.cv.Broadcast() 1036 } 1037} 1038 1039func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 1040 return &channelz.ServerInternalMetric{ 1041 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 1042 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 1043 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 1044 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 1045 } 1046} 1047 1048func (s *Server) incrCallsStarted() { 1049 atomic.AddInt64(&s.czData.callsStarted, 1) 1050 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 1051} 1052 1053func (s *Server) incrCallsSucceeded() { 1054 atomic.AddInt64(&s.czData.callsSucceeded, 1) 1055} 1056 1057func (s *Server) incrCallsFailed() { 1058 atomic.AddInt64(&s.czData.callsFailed, 1) 1059} 1060 1061func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 1062 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 1063 if err != nil { 1064 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) 1065 return err 1066 } 1067 compData, err := compress(data, cp, comp) 1068 if err != nil { 1069 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) 1070 return err 1071 } 1072 hdr, payload := msgHeader(data, compData) 1073 // TODO(dfawley): should we be checking len(data) instead? 1074 if len(payload) > s.opts.maxSendMessageSize { 1075 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 1076 } 1077 err = t.Write(stream, hdr, payload, opts) 1078 if err == nil && s.opts.statsHandler != nil { 1079 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 1080 } 1081 return err 1082} 1083 1084// chainUnaryServerInterceptors chains all unary server interceptors into one. 1085func chainUnaryServerInterceptors(s *Server) { 1086 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 1087 // be executed before any other chained interceptors. 1088 interceptors := s.opts.chainUnaryInts 1089 if s.opts.unaryInt != nil { 1090 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 1091 } 1092 1093 var chainedInt UnaryServerInterceptor 1094 if len(interceptors) == 0 { 1095 chainedInt = nil 1096 } else if len(interceptors) == 1 { 1097 chainedInt = interceptors[0] 1098 } else { 1099 chainedInt = chainUnaryInterceptors(interceptors) 1100 } 1101 1102 s.opts.unaryInt = chainedInt 1103} 1104 1105func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { 1106 return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 1107 // the struct ensures the variables are allocated together, rather than separately, since we 1108 // know they should be garbage collected together. This saves 1 allocation and decreases 1109 // time/call by about 10% on the microbenchmark. 1110 var state struct { 1111 i int 1112 next UnaryHandler 1113 } 1114 state.next = func(ctx context.Context, req interface{}) (interface{}, error) { 1115 if state.i == len(interceptors)-1 { 1116 return interceptors[state.i](ctx, req, info, handler) 1117 } 1118 state.i++ 1119 return interceptors[state.i-1](ctx, req, info, state.next) 1120 } 1121 return state.next(ctx, req) 1122 } 1123} 1124 1125func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { 1126 sh := s.opts.statsHandler 1127 if sh != nil || trInfo != nil || channelz.IsOn() { 1128 if channelz.IsOn() { 1129 s.incrCallsStarted() 1130 } 1131 var statsBegin *stats.Begin 1132 if sh != nil { 1133 beginTime := time.Now() 1134 statsBegin = &stats.Begin{ 1135 BeginTime: beginTime, 1136 IsClientStream: false, 1137 IsServerStream: false, 1138 } 1139 sh.HandleRPC(stream.Context(), statsBegin) 1140 } 1141 if trInfo != nil { 1142 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1143 } 1144 // The deferred error handling for tracing, stats handler and channelz are 1145 // combined into one function to reduce stack usage -- a defer takes ~56-64 1146 // bytes on the stack, so overflowing the stack will require a stack 1147 // re-allocation, which is expensive. 1148 // 1149 // To maintain behavior similar to separate deferred statements, statements 1150 // should be executed in the reverse order. That is, tracing first, stats 1151 // handler second, and channelz last. Note that panics *within* defers will 1152 // lead to different behavior, but that's an acceptable compromise; that 1153 // would be undefined behavior territory anyway. 1154 defer func() { 1155 if trInfo != nil { 1156 if err != nil && err != io.EOF { 1157 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1158 trInfo.tr.SetError() 1159 } 1160 trInfo.tr.Finish() 1161 } 1162 1163 if sh != nil { 1164 end := &stats.End{ 1165 BeginTime: statsBegin.BeginTime, 1166 EndTime: time.Now(), 1167 } 1168 if err != nil && err != io.EOF { 1169 end.Error = toRPCErr(err) 1170 } 1171 sh.HandleRPC(stream.Context(), end) 1172 } 1173 1174 if channelz.IsOn() { 1175 if err != nil && err != io.EOF { 1176 s.incrCallsFailed() 1177 } else { 1178 s.incrCallsSucceeded() 1179 } 1180 } 1181 }() 1182 } 1183 1184 binlog := binarylog.GetMethodLogger(stream.Method()) 1185 if binlog != nil { 1186 ctx := stream.Context() 1187 md, _ := metadata.FromIncomingContext(ctx) 1188 logEntry := &binarylog.ClientHeader{ 1189 Header: md, 1190 MethodName: stream.Method(), 1191 PeerAddr: nil, 1192 } 1193 if deadline, ok := ctx.Deadline(); ok { 1194 logEntry.Timeout = time.Until(deadline) 1195 if logEntry.Timeout < 0 { 1196 logEntry.Timeout = 0 1197 } 1198 } 1199 if a := md[":authority"]; len(a) > 0 { 1200 logEntry.Authority = a[0] 1201 } 1202 if peer, ok := peer.FromContext(ctx); ok { 1203 logEntry.PeerAddr = peer.Addr 1204 } 1205 binlog.Log(logEntry) 1206 } 1207 1208 // comp and cp are used for compression. decomp and dc are used for 1209 // decompression. If comp and decomp are both set, they are the same; 1210 // however they are kept separate to ensure that at most one of the 1211 // compressor/decompressor variable pairs are set for use later. 1212 var comp, decomp encoding.Compressor 1213 var cp Compressor 1214 var dc Decompressor 1215 1216 // If dc is set and matches the stream's compression, use it. Otherwise, try 1217 // to find a matching registered compressor for decomp. 1218 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1219 dc = s.opts.dc 1220 } else if rc != "" && rc != encoding.Identity { 1221 decomp = encoding.GetCompressor(rc) 1222 if decomp == nil { 1223 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1224 t.WriteStatus(stream, st) 1225 return st.Err() 1226 } 1227 } 1228 1229 // If cp is set, use it. Otherwise, attempt to compress the response using 1230 // the incoming message compression method. 1231 // 1232 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1233 if s.opts.cp != nil { 1234 cp = s.opts.cp 1235 stream.SetSendCompress(cp.Type()) 1236 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1237 // Legacy compressor not specified; attempt to respond with same encoding. 1238 comp = encoding.GetCompressor(rc) 1239 if comp != nil { 1240 stream.SetSendCompress(rc) 1241 } 1242 } 1243 1244 var payInfo *payloadInfo 1245 if sh != nil || binlog != nil { 1246 payInfo = &payloadInfo{} 1247 } 1248 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1249 if err != nil { 1250 if e := t.WriteStatus(stream, status.Convert(err)); e != nil { 1251 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) 1252 } 1253 return err 1254 } 1255 if channelz.IsOn() { 1256 t.IncrMsgRecv() 1257 } 1258 df := func(v interface{}) error { 1259 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1260 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1261 } 1262 if sh != nil { 1263 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1264 RecvTime: time.Now(), 1265 Payload: v, 1266 WireLength: payInfo.wireLength + headerLen, 1267 Data: d, 1268 Length: len(d), 1269 }) 1270 } 1271 if binlog != nil { 1272 binlog.Log(&binarylog.ClientMessage{ 1273 Message: d, 1274 }) 1275 } 1276 if trInfo != nil { 1277 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1278 } 1279 return nil 1280 } 1281 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1282 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) 1283 if appErr != nil { 1284 appStatus, ok := status.FromError(appErr) 1285 if !ok { 1286 // Convert appErr if it is not a grpc status error. 1287 appErr = status.Error(codes.Unknown, appErr.Error()) 1288 appStatus, _ = status.FromError(appErr) 1289 } 1290 if trInfo != nil { 1291 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1292 trInfo.tr.SetError() 1293 } 1294 if e := t.WriteStatus(stream, appStatus); e != nil { 1295 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1296 } 1297 if binlog != nil { 1298 if h, _ := stream.Header(); h.Len() > 0 { 1299 // Only log serverHeader if there was header. Otherwise it can 1300 // be trailer only. 1301 binlog.Log(&binarylog.ServerHeader{ 1302 Header: h, 1303 }) 1304 } 1305 binlog.Log(&binarylog.ServerTrailer{ 1306 Trailer: stream.Trailer(), 1307 Err: appErr, 1308 }) 1309 } 1310 return appErr 1311 } 1312 if trInfo != nil { 1313 trInfo.tr.LazyLog(stringer("OK"), false) 1314 } 1315 opts := &transport.Options{Last: true} 1316 1317 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1318 if err == io.EOF { 1319 // The entire stream is done (for unary RPC only). 1320 return err 1321 } 1322 if sts, ok := status.FromError(err); ok { 1323 if e := t.WriteStatus(stream, sts); e != nil { 1324 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1325 } 1326 } else { 1327 switch st := err.(type) { 1328 case transport.ConnectionError: 1329 // Nothing to do here. 1330 default: 1331 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1332 } 1333 } 1334 if binlog != nil { 1335 h, _ := stream.Header() 1336 binlog.Log(&binarylog.ServerHeader{ 1337 Header: h, 1338 }) 1339 binlog.Log(&binarylog.ServerTrailer{ 1340 Trailer: stream.Trailer(), 1341 Err: appErr, 1342 }) 1343 } 1344 return err 1345 } 1346 if binlog != nil { 1347 h, _ := stream.Header() 1348 binlog.Log(&binarylog.ServerHeader{ 1349 Header: h, 1350 }) 1351 binlog.Log(&binarylog.ServerMessage{ 1352 Message: reply, 1353 }) 1354 } 1355 if channelz.IsOn() { 1356 t.IncrMsgSent() 1357 } 1358 if trInfo != nil { 1359 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1360 } 1361 // TODO: Should we be logging if writing status failed here, like above? 1362 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1363 // error or allow the stats handler to see it? 1364 err = t.WriteStatus(stream, statusOK) 1365 if binlog != nil { 1366 binlog.Log(&binarylog.ServerTrailer{ 1367 Trailer: stream.Trailer(), 1368 Err: appErr, 1369 }) 1370 } 1371 return err 1372} 1373 1374// chainStreamServerInterceptors chains all stream server interceptors into one. 1375func chainStreamServerInterceptors(s *Server) { 1376 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1377 // be executed before any other chained interceptors. 1378 interceptors := s.opts.chainStreamInts 1379 if s.opts.streamInt != nil { 1380 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1381 } 1382 1383 var chainedInt StreamServerInterceptor 1384 if len(interceptors) == 0 { 1385 chainedInt = nil 1386 } else if len(interceptors) == 1 { 1387 chainedInt = interceptors[0] 1388 } else { 1389 chainedInt = chainStreamInterceptors(interceptors) 1390 } 1391 1392 s.opts.streamInt = chainedInt 1393} 1394 1395func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { 1396 return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1397 // the struct ensures the variables are allocated together, rather than separately, since we 1398 // know they should be garbage collected together. This saves 1 allocation and decreases 1399 // time/call by about 10% on the microbenchmark. 1400 var state struct { 1401 i int 1402 next StreamHandler 1403 } 1404 state.next = func(srv interface{}, ss ServerStream) error { 1405 if state.i == len(interceptors)-1 { 1406 return interceptors[state.i](srv, ss, info, handler) 1407 } 1408 state.i++ 1409 return interceptors[state.i-1](srv, ss, info, state.next) 1410 } 1411 return state.next(srv, ss) 1412 } 1413} 1414 1415func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { 1416 if channelz.IsOn() { 1417 s.incrCallsStarted() 1418 } 1419 sh := s.opts.statsHandler 1420 var statsBegin *stats.Begin 1421 if sh != nil { 1422 beginTime := time.Now() 1423 statsBegin = &stats.Begin{ 1424 BeginTime: beginTime, 1425 IsClientStream: sd.ClientStreams, 1426 IsServerStream: sd.ServerStreams, 1427 } 1428 sh.HandleRPC(stream.Context(), statsBegin) 1429 } 1430 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1431 ss := &serverStream{ 1432 ctx: ctx, 1433 t: t, 1434 s: stream, 1435 p: &parser{r: stream}, 1436 codec: s.getCodec(stream.ContentSubtype()), 1437 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1438 maxSendMessageSize: s.opts.maxSendMessageSize, 1439 trInfo: trInfo, 1440 statsHandler: sh, 1441 } 1442 1443 if sh != nil || trInfo != nil || channelz.IsOn() { 1444 // See comment in processUnaryRPC on defers. 1445 defer func() { 1446 if trInfo != nil { 1447 ss.mu.Lock() 1448 if err != nil && err != io.EOF { 1449 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1450 ss.trInfo.tr.SetError() 1451 } 1452 ss.trInfo.tr.Finish() 1453 ss.trInfo.tr = nil 1454 ss.mu.Unlock() 1455 } 1456 1457 if sh != nil { 1458 end := &stats.End{ 1459 BeginTime: statsBegin.BeginTime, 1460 EndTime: time.Now(), 1461 } 1462 if err != nil && err != io.EOF { 1463 end.Error = toRPCErr(err) 1464 } 1465 sh.HandleRPC(stream.Context(), end) 1466 } 1467 1468 if channelz.IsOn() { 1469 if err != nil && err != io.EOF { 1470 s.incrCallsFailed() 1471 } else { 1472 s.incrCallsSucceeded() 1473 } 1474 } 1475 }() 1476 } 1477 1478 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1479 if ss.binlog != nil { 1480 md, _ := metadata.FromIncomingContext(ctx) 1481 logEntry := &binarylog.ClientHeader{ 1482 Header: md, 1483 MethodName: stream.Method(), 1484 PeerAddr: nil, 1485 } 1486 if deadline, ok := ctx.Deadline(); ok { 1487 logEntry.Timeout = time.Until(deadline) 1488 if logEntry.Timeout < 0 { 1489 logEntry.Timeout = 0 1490 } 1491 } 1492 if a := md[":authority"]; len(a) > 0 { 1493 logEntry.Authority = a[0] 1494 } 1495 if peer, ok := peer.FromContext(ss.Context()); ok { 1496 logEntry.PeerAddr = peer.Addr 1497 } 1498 ss.binlog.Log(logEntry) 1499 } 1500 1501 // If dc is set and matches the stream's compression, use it. Otherwise, try 1502 // to find a matching registered compressor for decomp. 1503 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1504 ss.dc = s.opts.dc 1505 } else if rc != "" && rc != encoding.Identity { 1506 ss.decomp = encoding.GetCompressor(rc) 1507 if ss.decomp == nil { 1508 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1509 t.WriteStatus(ss.s, st) 1510 return st.Err() 1511 } 1512 } 1513 1514 // If cp is set, use it. Otherwise, attempt to compress the response using 1515 // the incoming message compression method. 1516 // 1517 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1518 if s.opts.cp != nil { 1519 ss.cp = s.opts.cp 1520 stream.SetSendCompress(s.opts.cp.Type()) 1521 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1522 // Legacy compressor not specified; attempt to respond with same encoding. 1523 ss.comp = encoding.GetCompressor(rc) 1524 if ss.comp != nil { 1525 stream.SetSendCompress(rc) 1526 } 1527 } 1528 1529 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp) 1530 1531 if trInfo != nil { 1532 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1533 } 1534 var appErr error 1535 var server interface{} 1536 if info != nil { 1537 server = info.serviceImpl 1538 } 1539 if s.opts.streamInt == nil { 1540 appErr = sd.Handler(server, ss) 1541 } else { 1542 info := &StreamServerInfo{ 1543 FullMethod: stream.Method(), 1544 IsClientStream: sd.ClientStreams, 1545 IsServerStream: sd.ServerStreams, 1546 } 1547 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1548 } 1549 if appErr != nil { 1550 appStatus, ok := status.FromError(appErr) 1551 if !ok { 1552 appStatus = status.New(codes.Unknown, appErr.Error()) 1553 appErr = appStatus.Err() 1554 } 1555 if trInfo != nil { 1556 ss.mu.Lock() 1557 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1558 ss.trInfo.tr.SetError() 1559 ss.mu.Unlock() 1560 } 1561 t.WriteStatus(ss.s, appStatus) 1562 if ss.binlog != nil { 1563 ss.binlog.Log(&binarylog.ServerTrailer{ 1564 Trailer: ss.s.Trailer(), 1565 Err: appErr, 1566 }) 1567 } 1568 // TODO: Should we log an error from WriteStatus here and below? 1569 return appErr 1570 } 1571 if trInfo != nil { 1572 ss.mu.Lock() 1573 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1574 ss.mu.Unlock() 1575 } 1576 err = t.WriteStatus(ss.s, statusOK) 1577 if ss.binlog != nil { 1578 ss.binlog.Log(&binarylog.ServerTrailer{ 1579 Trailer: ss.s.Trailer(), 1580 Err: appErr, 1581 }) 1582 } 1583 return err 1584} 1585 1586func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1587 sm := stream.Method() 1588 if sm != "" && sm[0] == '/' { 1589 sm = sm[1:] 1590 } 1591 pos := strings.LastIndex(sm, "/") 1592 if pos == -1 { 1593 if trInfo != nil { 1594 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1595 trInfo.tr.SetError() 1596 } 1597 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1598 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1599 if trInfo != nil { 1600 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1601 trInfo.tr.SetError() 1602 } 1603 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1604 } 1605 if trInfo != nil { 1606 trInfo.tr.Finish() 1607 } 1608 return 1609 } 1610 service := sm[:pos] 1611 method := sm[pos+1:] 1612 1613 srv, knownService := s.services[service] 1614 if knownService { 1615 if md, ok := srv.methods[method]; ok { 1616 s.processUnaryRPC(t, stream, srv, md, trInfo) 1617 return 1618 } 1619 if sd, ok := srv.streams[method]; ok { 1620 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1621 return 1622 } 1623 } 1624 // Unknown service, or known server unknown method. 1625 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1626 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1627 return 1628 } 1629 var errDesc string 1630 if !knownService { 1631 errDesc = fmt.Sprintf("unknown service %v", service) 1632 } else { 1633 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1634 } 1635 if trInfo != nil { 1636 trInfo.tr.LazyPrintf("%s", errDesc) 1637 trInfo.tr.SetError() 1638 } 1639 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1640 if trInfo != nil { 1641 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1642 trInfo.tr.SetError() 1643 } 1644 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1645 } 1646 if trInfo != nil { 1647 trInfo.tr.Finish() 1648 } 1649} 1650 1651// The key to save ServerTransportStream in the context. 1652type streamKey struct{} 1653 1654// NewContextWithServerTransportStream creates a new context from ctx and 1655// attaches stream to it. 1656// 1657// Experimental 1658// 1659// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1660// later release. 1661func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1662 return context.WithValue(ctx, streamKey{}, stream) 1663} 1664 1665// ServerTransportStream is a minimal interface that a transport stream must 1666// implement. This can be used to mock an actual transport stream for tests of 1667// handler code that use, for example, grpc.SetHeader (which requires some 1668// stream to be in context). 1669// 1670// See also NewContextWithServerTransportStream. 1671// 1672// Experimental 1673// 1674// Notice: This type is EXPERIMENTAL and may be changed or removed in a 1675// later release. 1676type ServerTransportStream interface { 1677 Method() string 1678 SetHeader(md metadata.MD) error 1679 SendHeader(md metadata.MD) error 1680 SetTrailer(md metadata.MD) error 1681} 1682 1683// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1684// ctx. Returns nil if the given context has no stream associated with it 1685// (which implies it is not an RPC invocation context). 1686// 1687// Experimental 1688// 1689// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1690// later release. 1691func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1692 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1693 return s 1694} 1695 1696// Stop stops the gRPC server. It immediately closes all open 1697// connections and listeners. 1698// It cancels all active RPCs on the server side and the corresponding 1699// pending RPCs on the client side will get notified by connection 1700// errors. 1701func (s *Server) Stop() { 1702 s.quit.Fire() 1703 1704 defer func() { 1705 s.serveWG.Wait() 1706 s.done.Fire() 1707 }() 1708 1709 s.channelzRemoveOnce.Do(func() { 1710 if channelz.IsOn() { 1711 channelz.RemoveEntry(s.channelzID) 1712 } 1713 }) 1714 1715 s.mu.Lock() 1716 listeners := s.lis 1717 s.lis = nil 1718 conns := s.conns 1719 s.conns = nil 1720 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1721 s.cv.Broadcast() 1722 s.mu.Unlock() 1723 1724 for lis := range listeners { 1725 lis.Close() 1726 } 1727 for _, cs := range conns { 1728 for st := range cs { 1729 st.Close() 1730 } 1731 } 1732 if s.opts.numServerWorkers > 0 { 1733 s.stopServerWorkers() 1734 } 1735 1736 s.mu.Lock() 1737 if s.events != nil { 1738 s.events.Finish() 1739 s.events = nil 1740 } 1741 s.mu.Unlock() 1742} 1743 1744// GracefulStop stops the gRPC server gracefully. It stops the server from 1745// accepting new connections and RPCs and blocks until all the pending RPCs are 1746// finished. 1747func (s *Server) GracefulStop() { 1748 s.quit.Fire() 1749 defer s.done.Fire() 1750 1751 s.channelzRemoveOnce.Do(func() { 1752 if channelz.IsOn() { 1753 channelz.RemoveEntry(s.channelzID) 1754 } 1755 }) 1756 s.mu.Lock() 1757 if s.conns == nil { 1758 s.mu.Unlock() 1759 return 1760 } 1761 1762 for lis := range s.lis { 1763 lis.Close() 1764 } 1765 s.lis = nil 1766 if !s.drain { 1767 for _, conns := range s.conns { 1768 for st := range conns { 1769 st.Drain() 1770 } 1771 } 1772 s.drain = true 1773 } 1774 1775 // Wait for serving threads to be ready to exit. Only then can we be sure no 1776 // new conns will be created. 1777 s.mu.Unlock() 1778 s.serveWG.Wait() 1779 s.mu.Lock() 1780 1781 for len(s.conns) != 0 { 1782 s.cv.Wait() 1783 } 1784 s.conns = nil 1785 if s.events != nil { 1786 s.events.Finish() 1787 s.events = nil 1788 } 1789 s.mu.Unlock() 1790} 1791 1792// contentSubtype must be lowercase 1793// cannot return nil 1794func (s *Server) getCodec(contentSubtype string) baseCodec { 1795 if s.opts.codec != nil { 1796 return s.opts.codec 1797 } 1798 if contentSubtype == "" { 1799 return encoding.GetCodec(proto.Name) 1800 } 1801 codec := encoding.GetCodec(contentSubtype) 1802 if codec == nil { 1803 return encoding.GetCodec(proto.Name) 1804 } 1805 return codec 1806} 1807 1808// SetHeader sets the header metadata. 1809// When called multiple times, all the provided metadata will be merged. 1810// All the metadata will be sent out when one of the following happens: 1811// - grpc.SendHeader() is called; 1812// - The first response is sent out; 1813// - An RPC status is sent out (error or success). 1814func SetHeader(ctx context.Context, md metadata.MD) error { 1815 if md.Len() == 0 { 1816 return nil 1817 } 1818 stream := ServerTransportStreamFromContext(ctx) 1819 if stream == nil { 1820 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1821 } 1822 return stream.SetHeader(md) 1823} 1824 1825// SendHeader sends header metadata. It may be called at most once. 1826// The provided md and headers set by SetHeader() will be sent. 1827func SendHeader(ctx context.Context, md metadata.MD) error { 1828 stream := ServerTransportStreamFromContext(ctx) 1829 if stream == nil { 1830 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1831 } 1832 if err := stream.SendHeader(md); err != nil { 1833 return toRPCErr(err) 1834 } 1835 return nil 1836} 1837 1838// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1839// When called more than once, all the provided metadata will be merged. 1840func SetTrailer(ctx context.Context, md metadata.MD) error { 1841 if md.Len() == 0 { 1842 return nil 1843 } 1844 stream := ServerTransportStreamFromContext(ctx) 1845 if stream == nil { 1846 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1847 } 1848 return stream.SetTrailer(md) 1849} 1850 1851// Method returns the method string for the server context. The returned 1852// string is in the format of "/service/method". 1853func Method(ctx context.Context) (string, bool) { 1854 s := ServerTransportStreamFromContext(ctx) 1855 if s == nil { 1856 return "", false 1857 } 1858 return s.Method(), true 1859} 1860 1861type channelzServer struct { 1862 s *Server 1863} 1864 1865func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1866 return c.s.channelzMetric() 1867} 1868