1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal" 44 "google.golang.org/grpc/internal/binarylog" 45 "google.golang.org/grpc/internal/channelz" 46 "google.golang.org/grpc/internal/grpcrand" 47 "google.golang.org/grpc/internal/grpcsync" 48 "google.golang.org/grpc/internal/transport" 49 "google.golang.org/grpc/keepalive" 50 "google.golang.org/grpc/metadata" 51 "google.golang.org/grpc/peer" 52 "google.golang.org/grpc/stats" 53 "google.golang.org/grpc/status" 54 "google.golang.org/grpc/tap" 55) 56 57const ( 58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 59 defaultServerMaxSendMessageSize = math.MaxInt32 60 61 // Server transports are tracked in a map which is keyed on listener 62 // address. For regular gRPC traffic, connections are accepted in Serve() 63 // through a call to Accept(), and we use the actual listener address as key 64 // when we add it to the map. But for connections received through 65 // ServeHTTP(), we do not have a listener and hence use this dummy value. 66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP" 67) 68 69func init() { 70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { 71 return srv.opts.creds 72 } 73 internal.DrainServerTransports = func(srv *Server, addr string) { 74 srv.drainServerTransports(addr) 75 } 76} 77 78var statusOK = status.New(codes.OK, "") 79var logger = grpclog.Component("core") 80 81type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 82 83// MethodDesc represents an RPC service's method specification. 84type MethodDesc struct { 85 MethodName string 86 Handler methodHandler 87} 88 89// ServiceDesc represents an RPC service's specification. 90type ServiceDesc struct { 91 ServiceName string 92 // The pointer to the service interface. Used to check whether the user 93 // provided implementation satisfies the interface requirements. 94 HandlerType interface{} 95 Methods []MethodDesc 96 Streams []StreamDesc 97 Metadata interface{} 98} 99 100// serviceInfo wraps information about a service. It is very similar to 101// ServiceDesc and is constructed from it for internal purposes. 102type serviceInfo struct { 103 // Contains the implementation for the methods in this service. 104 serviceImpl interface{} 105 methods map[string]*MethodDesc 106 streams map[string]*StreamDesc 107 mdata interface{} 108} 109 110type serverWorkerData struct { 111 st transport.ServerTransport 112 wg *sync.WaitGroup 113 stream *transport.Stream 114} 115 116// Server is a gRPC server to serve RPC requests. 117type Server struct { 118 opts serverOptions 119 120 mu sync.Mutex // guards following 121 lis map[net.Listener]bool 122 // conns contains all active server transports. It is a map keyed on a 123 // listener address with the value being the set of active transports 124 // belonging to that listener. 125 conns map[string]map[transport.ServerTransport]bool 126 serve bool 127 drain bool 128 cv *sync.Cond // signaled when connections close for GracefulStop 129 services map[string]*serviceInfo // service name -> service info 130 events trace.EventLog 131 132 quit *grpcsync.Event 133 done *grpcsync.Event 134 channelzRemoveOnce sync.Once 135 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 136 137 channelzID int64 // channelz unique identification number 138 czData *channelzData 139 140 serverWorkerChannels []chan *serverWorkerData 141} 142 143type serverOptions struct { 144 creds credentials.TransportCredentials 145 codec baseCodec 146 cp Compressor 147 dc Decompressor 148 unaryInt UnaryServerInterceptor 149 streamInt StreamServerInterceptor 150 chainUnaryInts []UnaryServerInterceptor 151 chainStreamInts []StreamServerInterceptor 152 inTapHandle tap.ServerInHandle 153 statsHandler stats.Handler 154 maxConcurrentStreams uint32 155 maxReceiveMessageSize int 156 maxSendMessageSize int 157 unknownStreamDesc *StreamDesc 158 keepaliveParams keepalive.ServerParameters 159 keepalivePolicy keepalive.EnforcementPolicy 160 initialWindowSize int32 161 initialConnWindowSize int32 162 writeBufferSize int 163 readBufferSize int 164 connectionTimeout time.Duration 165 maxHeaderListSize *uint32 166 headerTableSize *uint32 167 numServerWorkers uint32 168} 169 170var defaultServerOptions = serverOptions{ 171 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 172 maxSendMessageSize: defaultServerMaxSendMessageSize, 173 connectionTimeout: 120 * time.Second, 174 writeBufferSize: defaultWriteBufSize, 175 readBufferSize: defaultReadBufSize, 176} 177 178// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 179type ServerOption interface { 180 apply(*serverOptions) 181} 182 183// EmptyServerOption does not alter the server configuration. It can be embedded 184// in another structure to build custom server options. 185// 186// Experimental 187// 188// Notice: This type is EXPERIMENTAL and may be changed or removed in a 189// later release. 190type EmptyServerOption struct{} 191 192func (EmptyServerOption) apply(*serverOptions) {} 193 194// funcServerOption wraps a function that modifies serverOptions into an 195// implementation of the ServerOption interface. 196type funcServerOption struct { 197 f func(*serverOptions) 198} 199 200func (fdo *funcServerOption) apply(do *serverOptions) { 201 fdo.f(do) 202} 203 204func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 205 return &funcServerOption{ 206 f: f, 207 } 208} 209 210// WriteBufferSize determines how much data can be batched before doing a write on the wire. 211// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 212// The default value for this buffer is 32KB. 213// Zero will disable the write buffer such that each write will be on underlying connection. 214// Note: A Send call may not directly translate to a write. 215func WriteBufferSize(s int) ServerOption { 216 return newFuncServerOption(func(o *serverOptions) { 217 o.writeBufferSize = s 218 }) 219} 220 221// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 222// for one read syscall. 223// The default value for this buffer is 32KB. 224// Zero will disable read buffer for a connection so data framer can access the underlying 225// conn directly. 226func ReadBufferSize(s int) ServerOption { 227 return newFuncServerOption(func(o *serverOptions) { 228 o.readBufferSize = s 229 }) 230} 231 232// InitialWindowSize returns a ServerOption that sets window size for stream. 233// The lower bound for window size is 64K and any value smaller than that will be ignored. 234func InitialWindowSize(s int32) ServerOption { 235 return newFuncServerOption(func(o *serverOptions) { 236 o.initialWindowSize = s 237 }) 238} 239 240// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 241// The lower bound for window size is 64K and any value smaller than that will be ignored. 242func InitialConnWindowSize(s int32) ServerOption { 243 return newFuncServerOption(func(o *serverOptions) { 244 o.initialConnWindowSize = s 245 }) 246} 247 248// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 249func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 250 if kp.Time > 0 && kp.Time < time.Second { 251 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") 252 kp.Time = time.Second 253 } 254 255 return newFuncServerOption(func(o *serverOptions) { 256 o.keepaliveParams = kp 257 }) 258} 259 260// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 261func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 262 return newFuncServerOption(func(o *serverOptions) { 263 o.keepalivePolicy = kep 264 }) 265} 266 267// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 268// 269// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 270// 271// Deprecated: register codecs using encoding.RegisterCodec. The server will 272// automatically use registered codecs based on the incoming requests' headers. 273// See also 274// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 275// Will be supported throughout 1.x. 276func CustomCodec(codec Codec) ServerOption { 277 return newFuncServerOption(func(o *serverOptions) { 278 o.codec = codec 279 }) 280} 281 282// ForceServerCodec returns a ServerOption that sets a codec for message 283// marshaling and unmarshaling. 284// 285// This will override any lookups by content-subtype for Codecs registered 286// with RegisterCodec. 287// 288// See Content-Type on 289// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 290// more details. Also see the documentation on RegisterCodec and 291// CallContentSubtype for more details on the interaction between encoding.Codec 292// and content-subtype. 293// 294// This function is provided for advanced users; prefer to register codecs 295// using encoding.RegisterCodec. 296// The server will automatically use registered codecs based on the incoming 297// requests' headers. See also 298// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 299// Will be supported throughout 1.x. 300// 301// Experimental 302// 303// Notice: This API is EXPERIMENTAL and may be changed or removed in a 304// later release. 305func ForceServerCodec(codec encoding.Codec) ServerOption { 306 return newFuncServerOption(func(o *serverOptions) { 307 o.codec = codec 308 }) 309} 310 311// RPCCompressor returns a ServerOption that sets a compressor for outbound 312// messages. For backward compatibility, all outbound messages will be sent 313// using this compressor, regardless of incoming message compression. By 314// default, server messages will be sent using the same compressor with which 315// request messages were sent. 316// 317// Deprecated: use encoding.RegisterCompressor instead. Will be supported 318// throughout 1.x. 319func RPCCompressor(cp Compressor) ServerOption { 320 return newFuncServerOption(func(o *serverOptions) { 321 o.cp = cp 322 }) 323} 324 325// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 326// messages. It has higher priority than decompressors registered via 327// encoding.RegisterCompressor. 328// 329// Deprecated: use encoding.RegisterCompressor instead. Will be supported 330// throughout 1.x. 331func RPCDecompressor(dc Decompressor) ServerOption { 332 return newFuncServerOption(func(o *serverOptions) { 333 o.dc = dc 334 }) 335} 336 337// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 338// If this is not set, gRPC uses the default limit. 339// 340// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. 341func MaxMsgSize(m int) ServerOption { 342 return MaxRecvMsgSize(m) 343} 344 345// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 346// If this is not set, gRPC uses the default 4MB. 347func MaxRecvMsgSize(m int) ServerOption { 348 return newFuncServerOption(func(o *serverOptions) { 349 o.maxReceiveMessageSize = m 350 }) 351} 352 353// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 354// If this is not set, gRPC uses the default `math.MaxInt32`. 355func MaxSendMsgSize(m int) ServerOption { 356 return newFuncServerOption(func(o *serverOptions) { 357 o.maxSendMessageSize = m 358 }) 359} 360 361// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 362// of concurrent streams to each ServerTransport. 363func MaxConcurrentStreams(n uint32) ServerOption { 364 return newFuncServerOption(func(o *serverOptions) { 365 o.maxConcurrentStreams = n 366 }) 367} 368 369// Creds returns a ServerOption that sets credentials for server connections. 370func Creds(c credentials.TransportCredentials) ServerOption { 371 return newFuncServerOption(func(o *serverOptions) { 372 o.creds = c 373 }) 374} 375 376// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 377// server. Only one unary interceptor can be installed. The construction of multiple 378// interceptors (e.g., chaining) can be implemented at the caller. 379func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 380 return newFuncServerOption(func(o *serverOptions) { 381 if o.unaryInt != nil { 382 panic("The unary server interceptor was already set and may not be reset.") 383 } 384 o.unaryInt = i 385 }) 386} 387 388// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 389// for unary RPCs. The first interceptor will be the outer most, 390// while the last interceptor will be the inner most wrapper around the real call. 391// All unary interceptors added by this method will be chained. 392func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 393 return newFuncServerOption(func(o *serverOptions) { 394 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 395 }) 396} 397 398// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 399// server. Only one stream interceptor can be installed. 400func StreamInterceptor(i StreamServerInterceptor) ServerOption { 401 return newFuncServerOption(func(o *serverOptions) { 402 if o.streamInt != nil { 403 panic("The stream server interceptor was already set and may not be reset.") 404 } 405 o.streamInt = i 406 }) 407} 408 409// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 410// for streaming RPCs. The first interceptor will be the outer most, 411// while the last interceptor will be the inner most wrapper around the real call. 412// All stream interceptors added by this method will be chained. 413func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 414 return newFuncServerOption(func(o *serverOptions) { 415 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 416 }) 417} 418 419// InTapHandle returns a ServerOption that sets the tap handle for all the server 420// transport to be created. Only one can be installed. 421// 422// Experimental 423// 424// Notice: This API is EXPERIMENTAL and may be changed or removed in a 425// later release. 426func InTapHandle(h tap.ServerInHandle) ServerOption { 427 return newFuncServerOption(func(o *serverOptions) { 428 if o.inTapHandle != nil { 429 panic("The tap handle was already set and may not be reset.") 430 } 431 o.inTapHandle = h 432 }) 433} 434 435// StatsHandler returns a ServerOption that sets the stats handler for the server. 436func StatsHandler(h stats.Handler) ServerOption { 437 return newFuncServerOption(func(o *serverOptions) { 438 o.statsHandler = h 439 }) 440} 441 442// UnknownServiceHandler returns a ServerOption that allows for adding a custom 443// unknown service handler. The provided method is a bidi-streaming RPC service 444// handler that will be invoked instead of returning the "unimplemented" gRPC 445// error whenever a request is received for an unregistered service or method. 446// The handling function and stream interceptor (if set) have full access to 447// the ServerStream, including its Context. 448func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 449 return newFuncServerOption(func(o *serverOptions) { 450 o.unknownStreamDesc = &StreamDesc{ 451 StreamName: "unknown_service_handler", 452 Handler: streamHandler, 453 // We need to assume that the users of the streamHandler will want to use both. 454 ClientStreams: true, 455 ServerStreams: true, 456 } 457 }) 458} 459 460// ConnectionTimeout returns a ServerOption that sets the timeout for 461// connection establishment (up to and including HTTP/2 handshaking) for all 462// new connections. If this is not set, the default is 120 seconds. A zero or 463// negative value will result in an immediate timeout. 464// 465// Experimental 466// 467// Notice: This API is EXPERIMENTAL and may be changed or removed in a 468// later release. 469func ConnectionTimeout(d time.Duration) ServerOption { 470 return newFuncServerOption(func(o *serverOptions) { 471 o.connectionTimeout = d 472 }) 473} 474 475// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 476// of header list that the server is prepared to accept. 477func MaxHeaderListSize(s uint32) ServerOption { 478 return newFuncServerOption(func(o *serverOptions) { 479 o.maxHeaderListSize = &s 480 }) 481} 482 483// HeaderTableSize returns a ServerOption that sets the size of dynamic 484// header table for stream. 485// 486// Experimental 487// 488// Notice: This API is EXPERIMENTAL and may be changed or removed in a 489// later release. 490func HeaderTableSize(s uint32) ServerOption { 491 return newFuncServerOption(func(o *serverOptions) { 492 o.headerTableSize = &s 493 }) 494} 495 496// NumStreamWorkers returns a ServerOption that sets the number of worker 497// goroutines that should be used to process incoming streams. Setting this to 498// zero (default) will disable workers and spawn a new goroutine for each 499// stream. 500// 501// Experimental 502// 503// Notice: This API is EXPERIMENTAL and may be changed or removed in a 504// later release. 505func NumStreamWorkers(numServerWorkers uint32) ServerOption { 506 // TODO: If/when this API gets stabilized (i.e. stream workers become the 507 // only way streams are processed), change the behavior of the zero value to 508 // a sane default. Preliminary experiments suggest that a value equal to the 509 // number of CPUs available is most performant; requires thorough testing. 510 return newFuncServerOption(func(o *serverOptions) { 511 o.numServerWorkers = numServerWorkers 512 }) 513} 514 515// serverWorkerResetThreshold defines how often the stack must be reset. Every 516// N requests, by spawning a new goroutine in its place, a worker can reset its 517// stack so that large stacks don't live in memory forever. 2^16 should allow 518// each goroutine stack to live for at least a few seconds in a typical 519// workload (assuming a QPS of a few thousand requests/sec). 520const serverWorkerResetThreshold = 1 << 16 521 522// serverWorkers blocks on a *transport.Stream channel forever and waits for 523// data to be fed by serveStreams. This allows different requests to be 524// processed by the same goroutine, removing the need for expensive stack 525// re-allocations (see the runtime.morestack problem [1]). 526// 527// [1] https://github.com/golang/go/issues/18138 528func (s *Server) serverWorker(ch chan *serverWorkerData) { 529 // To make sure all server workers don't reset at the same time, choose a 530 // random number of iterations before resetting. 531 threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) 532 for completed := 0; completed < threshold; completed++ { 533 data, ok := <-ch 534 if !ok { 535 return 536 } 537 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) 538 data.wg.Done() 539 } 540 go s.serverWorker(ch) 541} 542 543// initServerWorkers creates worker goroutines and channels to process incoming 544// connections to reduce the time spent overall on runtime.morestack. 545func (s *Server) initServerWorkers() { 546 s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) 547 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 548 s.serverWorkerChannels[i] = make(chan *serverWorkerData) 549 go s.serverWorker(s.serverWorkerChannels[i]) 550 } 551} 552 553func (s *Server) stopServerWorkers() { 554 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 555 close(s.serverWorkerChannels[i]) 556 } 557} 558 559// NewServer creates a gRPC server which has no service registered and has not 560// started to accept requests yet. 561func NewServer(opt ...ServerOption) *Server { 562 opts := defaultServerOptions 563 for _, o := range opt { 564 o.apply(&opts) 565 } 566 s := &Server{ 567 lis: make(map[net.Listener]bool), 568 opts: opts, 569 conns: make(map[string]map[transport.ServerTransport]bool), 570 services: make(map[string]*serviceInfo), 571 quit: grpcsync.NewEvent(), 572 done: grpcsync.NewEvent(), 573 czData: new(channelzData), 574 } 575 chainUnaryServerInterceptors(s) 576 chainStreamServerInterceptors(s) 577 s.cv = sync.NewCond(&s.mu) 578 if EnableTracing { 579 _, file, line, _ := runtime.Caller(1) 580 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 581 } 582 583 if s.opts.numServerWorkers > 0 { 584 s.initServerWorkers() 585 } 586 587 if channelz.IsOn() { 588 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 589 } 590 return s 591} 592 593// printf records an event in s's event log, unless s has been stopped. 594// REQUIRES s.mu is held. 595func (s *Server) printf(format string, a ...interface{}) { 596 if s.events != nil { 597 s.events.Printf(format, a...) 598 } 599} 600 601// errorf records an error in s's event log, unless s has been stopped. 602// REQUIRES s.mu is held. 603func (s *Server) errorf(format string, a ...interface{}) { 604 if s.events != nil { 605 s.events.Errorf(format, a...) 606 } 607} 608 609// ServiceRegistrar wraps a single method that supports service registration. It 610// enables users to pass concrete types other than grpc.Server to the service 611// registration methods exported by the IDL generated code. 612type ServiceRegistrar interface { 613 // RegisterService registers a service and its implementation to the 614 // concrete type implementing this interface. It may not be called 615 // once the server has started serving. 616 // desc describes the service and its methods and handlers. impl is the 617 // service implementation which is passed to the method handlers. 618 RegisterService(desc *ServiceDesc, impl interface{}) 619} 620 621// RegisterService registers a service and its implementation to the gRPC 622// server. It is called from the IDL generated code. This must be called before 623// invoking Serve. If ss is non-nil (for legacy code), its type is checked to 624// ensure it implements sd.HandlerType. 625func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 626 if ss != nil { 627 ht := reflect.TypeOf(sd.HandlerType).Elem() 628 st := reflect.TypeOf(ss) 629 if !st.Implements(ht) { 630 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 631 } 632 } 633 s.register(sd, ss) 634} 635 636func (s *Server) register(sd *ServiceDesc, ss interface{}) { 637 s.mu.Lock() 638 defer s.mu.Unlock() 639 s.printf("RegisterService(%q)", sd.ServiceName) 640 if s.serve { 641 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 642 } 643 if _, ok := s.services[sd.ServiceName]; ok { 644 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 645 } 646 info := &serviceInfo{ 647 serviceImpl: ss, 648 methods: make(map[string]*MethodDesc), 649 streams: make(map[string]*StreamDesc), 650 mdata: sd.Metadata, 651 } 652 for i := range sd.Methods { 653 d := &sd.Methods[i] 654 info.methods[d.MethodName] = d 655 } 656 for i := range sd.Streams { 657 d := &sd.Streams[i] 658 info.streams[d.StreamName] = d 659 } 660 s.services[sd.ServiceName] = info 661} 662 663// MethodInfo contains the information of an RPC including its method name and type. 664type MethodInfo struct { 665 // Name is the method name only, without the service name or package name. 666 Name string 667 // IsClientStream indicates whether the RPC is a client streaming RPC. 668 IsClientStream bool 669 // IsServerStream indicates whether the RPC is a server streaming RPC. 670 IsServerStream bool 671} 672 673// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 674type ServiceInfo struct { 675 Methods []MethodInfo 676 // Metadata is the metadata specified in ServiceDesc when registering service. 677 Metadata interface{} 678} 679 680// GetServiceInfo returns a map from service names to ServiceInfo. 681// Service names include the package names, in the form of <package>.<service>. 682func (s *Server) GetServiceInfo() map[string]ServiceInfo { 683 ret := make(map[string]ServiceInfo) 684 for n, srv := range s.services { 685 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams)) 686 for m := range srv.methods { 687 methods = append(methods, MethodInfo{ 688 Name: m, 689 IsClientStream: false, 690 IsServerStream: false, 691 }) 692 } 693 for m, d := range srv.streams { 694 methods = append(methods, MethodInfo{ 695 Name: m, 696 IsClientStream: d.ClientStreams, 697 IsServerStream: d.ServerStreams, 698 }) 699 } 700 701 ret[n] = ServiceInfo{ 702 Methods: methods, 703 Metadata: srv.mdata, 704 } 705 } 706 return ret 707} 708 709// ErrServerStopped indicates that the operation is now illegal because of 710// the server being stopped. 711var ErrServerStopped = errors.New("grpc: the server has been stopped") 712 713func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 714 if s.opts.creds == nil { 715 return rawConn, nil, nil 716 } 717 return s.opts.creds.ServerHandshake(rawConn) 718} 719 720type listenSocket struct { 721 net.Listener 722 channelzID int64 723} 724 725func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 726 return &channelz.SocketInternalMetric{ 727 SocketOptions: channelz.GetSocketOption(l.Listener), 728 LocalAddr: l.Listener.Addr(), 729 } 730} 731 732func (l *listenSocket) Close() error { 733 err := l.Listener.Close() 734 if channelz.IsOn() { 735 channelz.RemoveEntry(l.channelzID) 736 } 737 return err 738} 739 740// Serve accepts incoming connections on the listener lis, creating a new 741// ServerTransport and service goroutine for each. The service goroutines 742// read gRPC requests and then call the registered handlers to reply to them. 743// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 744// this method returns. 745// Serve will return a non-nil error unless Stop or GracefulStop is called. 746func (s *Server) Serve(lis net.Listener) error { 747 s.mu.Lock() 748 s.printf("serving") 749 s.serve = true 750 if s.lis == nil { 751 // Serve called after Stop or GracefulStop. 752 s.mu.Unlock() 753 lis.Close() 754 return ErrServerStopped 755 } 756 757 s.serveWG.Add(1) 758 defer func() { 759 s.serveWG.Done() 760 if s.quit.HasFired() { 761 // Stop or GracefulStop called; block until done and return nil. 762 <-s.done.Done() 763 } 764 }() 765 766 ls := &listenSocket{Listener: lis} 767 s.lis[ls] = true 768 769 if channelz.IsOn() { 770 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 771 } 772 s.mu.Unlock() 773 774 defer func() { 775 s.mu.Lock() 776 if s.lis != nil && s.lis[ls] { 777 ls.Close() 778 delete(s.lis, ls) 779 } 780 s.mu.Unlock() 781 }() 782 783 var tempDelay time.Duration // how long to sleep on accept failure 784 785 for { 786 rawConn, err := lis.Accept() 787 if err != nil { 788 if ne, ok := err.(interface { 789 Temporary() bool 790 }); ok && ne.Temporary() { 791 if tempDelay == 0 { 792 tempDelay = 5 * time.Millisecond 793 } else { 794 tempDelay *= 2 795 } 796 if max := 1 * time.Second; tempDelay > max { 797 tempDelay = max 798 } 799 s.mu.Lock() 800 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 801 s.mu.Unlock() 802 timer := time.NewTimer(tempDelay) 803 select { 804 case <-timer.C: 805 case <-s.quit.Done(): 806 timer.Stop() 807 return nil 808 } 809 continue 810 } 811 s.mu.Lock() 812 s.printf("done serving; Accept = %v", err) 813 s.mu.Unlock() 814 815 if s.quit.HasFired() { 816 return nil 817 } 818 return err 819 } 820 tempDelay = 0 821 // Start a new goroutine to deal with rawConn so we don't stall this Accept 822 // loop goroutine. 823 // 824 // Make sure we account for the goroutine so GracefulStop doesn't nil out 825 // s.conns before this conn can be added. 826 s.serveWG.Add(1) 827 go func() { 828 s.handleRawConn(lis.Addr().String(), rawConn) 829 s.serveWG.Done() 830 }() 831 } 832} 833 834// handleRawConn forks a goroutine to handle a just-accepted connection that 835// has not had any I/O performed on it yet. 836func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { 837 if s.quit.HasFired() { 838 rawConn.Close() 839 return 840 } 841 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 842 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 843 if err != nil { 844 // ErrConnDispatched means that the connection was dispatched away from 845 // gRPC; those connections should be left open. 846 if err != credentials.ErrConnDispatched { 847 s.mu.Lock() 848 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 849 s.mu.Unlock() 850 channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 851 rawConn.Close() 852 } 853 rawConn.SetDeadline(time.Time{}) 854 return 855 } 856 857 // Finish handshaking (HTTP2) 858 st := s.newHTTP2Transport(conn, authInfo) 859 if st == nil { 860 return 861 } 862 863 rawConn.SetDeadline(time.Time{}) 864 if !s.addConn(lisAddr, st) { 865 return 866 } 867 go func() { 868 s.serveStreams(st) 869 s.removeConn(lisAddr, st) 870 }() 871} 872 873func (s *Server) drainServerTransports(addr string) { 874 s.mu.Lock() 875 conns := s.conns[addr] 876 for st := range conns { 877 st.Drain() 878 } 879 s.mu.Unlock() 880} 881 882// newHTTP2Transport sets up a http/2 transport (using the 883// gRPC http2 server transport in transport/http2_server.go). 884func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 885 config := &transport.ServerConfig{ 886 MaxStreams: s.opts.maxConcurrentStreams, 887 AuthInfo: authInfo, 888 InTapHandle: s.opts.inTapHandle, 889 StatsHandler: s.opts.statsHandler, 890 KeepaliveParams: s.opts.keepaliveParams, 891 KeepalivePolicy: s.opts.keepalivePolicy, 892 InitialWindowSize: s.opts.initialWindowSize, 893 InitialConnWindowSize: s.opts.initialConnWindowSize, 894 WriteBufferSize: s.opts.writeBufferSize, 895 ReadBufferSize: s.opts.readBufferSize, 896 ChannelzParentID: s.channelzID, 897 MaxHeaderListSize: s.opts.maxHeaderListSize, 898 HeaderTableSize: s.opts.headerTableSize, 899 } 900 st, err := transport.NewServerTransport("http2", c, config) 901 if err != nil { 902 s.mu.Lock() 903 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 904 s.mu.Unlock() 905 c.Close() 906 channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 907 return nil 908 } 909 910 return st 911} 912 913func (s *Server) serveStreams(st transport.ServerTransport) { 914 defer st.Close() 915 var wg sync.WaitGroup 916 917 var roundRobinCounter uint32 918 st.HandleStreams(func(stream *transport.Stream) { 919 wg.Add(1) 920 if s.opts.numServerWorkers > 0 { 921 data := &serverWorkerData{st: st, wg: &wg, stream: stream} 922 select { 923 case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: 924 default: 925 // If all stream workers are busy, fallback to the default code path. 926 go func() { 927 s.handleStream(st, stream, s.traceInfo(st, stream)) 928 wg.Done() 929 }() 930 } 931 } else { 932 go func() { 933 defer wg.Done() 934 s.handleStream(st, stream, s.traceInfo(st, stream)) 935 }() 936 } 937 }, func(ctx context.Context, method string) context.Context { 938 if !EnableTracing { 939 return ctx 940 } 941 tr := trace.New("grpc.Recv."+methodFamily(method), method) 942 return trace.NewContext(ctx, tr) 943 }) 944 wg.Wait() 945} 946 947var _ http.Handler = (*Server)(nil) 948 949// ServeHTTP implements the Go standard library's http.Handler 950// interface by responding to the gRPC request r, by looking up 951// the requested gRPC method in the gRPC server s. 952// 953// The provided HTTP request must have arrived on an HTTP/2 954// connection. When using the Go standard library's server, 955// practically this means that the Request must also have arrived 956// over TLS. 957// 958// To share one port (such as 443 for https) between gRPC and an 959// existing http.Handler, use a root http.Handler such as: 960// 961// if r.ProtoMajor == 2 && strings.HasPrefix( 962// r.Header.Get("Content-Type"), "application/grpc") { 963// grpcServer.ServeHTTP(w, r) 964// } else { 965// yourMux.ServeHTTP(w, r) 966// } 967// 968// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 969// separate from grpc-go's HTTP/2 server. Performance and features may vary 970// between the two paths. ServeHTTP does not support some gRPC features 971// available through grpc-go's HTTP/2 server. 972// 973// Experimental 974// 975// Notice: This API is EXPERIMENTAL and may be changed or removed in a 976// later release. 977func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 978 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 979 if err != nil { 980 http.Error(w, err.Error(), http.StatusInternalServerError) 981 return 982 } 983 if !s.addConn(listenerAddressForServeHTTP, st) { 984 return 985 } 986 defer s.removeConn(listenerAddressForServeHTTP, st) 987 s.serveStreams(st) 988} 989 990// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 991// If tracing is not enabled, it returns nil. 992func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 993 if !EnableTracing { 994 return nil 995 } 996 tr, ok := trace.FromContext(stream.Context()) 997 if !ok { 998 return nil 999 } 1000 1001 trInfo = &traceInfo{ 1002 tr: tr, 1003 firstLine: firstLine{ 1004 client: false, 1005 remoteAddr: st.RemoteAddr(), 1006 }, 1007 } 1008 if dl, ok := stream.Context().Deadline(); ok { 1009 trInfo.firstLine.deadline = time.Until(dl) 1010 } 1011 return trInfo 1012} 1013 1014func (s *Server) addConn(addr string, st transport.ServerTransport) bool { 1015 s.mu.Lock() 1016 defer s.mu.Unlock() 1017 if s.conns == nil { 1018 st.Close() 1019 return false 1020 } 1021 if s.drain { 1022 // Transport added after we drained our existing conns: drain it 1023 // immediately. 1024 st.Drain() 1025 } 1026 1027 if s.conns[addr] == nil { 1028 // Create a map entry if this is the first connection on this listener. 1029 s.conns[addr] = make(map[transport.ServerTransport]bool) 1030 } 1031 s.conns[addr][st] = true 1032 return true 1033} 1034 1035func (s *Server) removeConn(addr string, st transport.ServerTransport) { 1036 s.mu.Lock() 1037 defer s.mu.Unlock() 1038 1039 conns := s.conns[addr] 1040 if conns != nil { 1041 delete(conns, st) 1042 if len(conns) == 0 { 1043 // If the last connection for this address is being removed, also 1044 // remove the map entry corresponding to the address. This is used 1045 // in GracefulStop() when waiting for all connections to be closed. 1046 delete(s.conns, addr) 1047 } 1048 s.cv.Broadcast() 1049 } 1050} 1051 1052func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 1053 return &channelz.ServerInternalMetric{ 1054 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 1055 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 1056 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 1057 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 1058 } 1059} 1060 1061func (s *Server) incrCallsStarted() { 1062 atomic.AddInt64(&s.czData.callsStarted, 1) 1063 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 1064} 1065 1066func (s *Server) incrCallsSucceeded() { 1067 atomic.AddInt64(&s.czData.callsSucceeded, 1) 1068} 1069 1070func (s *Server) incrCallsFailed() { 1071 atomic.AddInt64(&s.czData.callsFailed, 1) 1072} 1073 1074func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 1075 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 1076 if err != nil { 1077 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) 1078 return err 1079 } 1080 compData, err := compress(data, cp, comp) 1081 if err != nil { 1082 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) 1083 return err 1084 } 1085 hdr, payload := msgHeader(data, compData) 1086 // TODO(dfawley): should we be checking len(data) instead? 1087 if len(payload) > s.opts.maxSendMessageSize { 1088 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 1089 } 1090 err = t.Write(stream, hdr, payload, opts) 1091 if err == nil && s.opts.statsHandler != nil { 1092 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 1093 } 1094 return err 1095} 1096 1097// chainUnaryServerInterceptors chains all unary server interceptors into one. 1098func chainUnaryServerInterceptors(s *Server) { 1099 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 1100 // be executed before any other chained interceptors. 1101 interceptors := s.opts.chainUnaryInts 1102 if s.opts.unaryInt != nil { 1103 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 1104 } 1105 1106 var chainedInt UnaryServerInterceptor 1107 if len(interceptors) == 0 { 1108 chainedInt = nil 1109 } else if len(interceptors) == 1 { 1110 chainedInt = interceptors[0] 1111 } else { 1112 chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 1113 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) 1114 } 1115 } 1116 1117 s.opts.unaryInt = chainedInt 1118} 1119 1120// getChainUnaryHandler recursively generate the chained UnaryHandler 1121func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { 1122 if curr == len(interceptors)-1 { 1123 return finalHandler 1124 } 1125 1126 return func(ctx context.Context, req interface{}) (interface{}, error) { 1127 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) 1128 } 1129} 1130 1131func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { 1132 sh := s.opts.statsHandler 1133 if sh != nil || trInfo != nil || channelz.IsOn() { 1134 if channelz.IsOn() { 1135 s.incrCallsStarted() 1136 } 1137 var statsBegin *stats.Begin 1138 if sh != nil { 1139 beginTime := time.Now() 1140 statsBegin = &stats.Begin{ 1141 BeginTime: beginTime, 1142 } 1143 sh.HandleRPC(stream.Context(), statsBegin) 1144 } 1145 if trInfo != nil { 1146 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1147 } 1148 // The deferred error handling for tracing, stats handler and channelz are 1149 // combined into one function to reduce stack usage -- a defer takes ~56-64 1150 // bytes on the stack, so overflowing the stack will require a stack 1151 // re-allocation, which is expensive. 1152 // 1153 // To maintain behavior similar to separate deferred statements, statements 1154 // should be executed in the reverse order. That is, tracing first, stats 1155 // handler second, and channelz last. Note that panics *within* defers will 1156 // lead to different behavior, but that's an acceptable compromise; that 1157 // would be undefined behavior territory anyway. 1158 defer func() { 1159 if trInfo != nil { 1160 if err != nil && err != io.EOF { 1161 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1162 trInfo.tr.SetError() 1163 } 1164 trInfo.tr.Finish() 1165 } 1166 1167 if sh != nil { 1168 end := &stats.End{ 1169 BeginTime: statsBegin.BeginTime, 1170 EndTime: time.Now(), 1171 } 1172 if err != nil && err != io.EOF { 1173 end.Error = toRPCErr(err) 1174 } 1175 sh.HandleRPC(stream.Context(), end) 1176 } 1177 1178 if channelz.IsOn() { 1179 if err != nil && err != io.EOF { 1180 s.incrCallsFailed() 1181 } else { 1182 s.incrCallsSucceeded() 1183 } 1184 } 1185 }() 1186 } 1187 1188 binlog := binarylog.GetMethodLogger(stream.Method()) 1189 if binlog != nil { 1190 ctx := stream.Context() 1191 md, _ := metadata.FromIncomingContext(ctx) 1192 logEntry := &binarylog.ClientHeader{ 1193 Header: md, 1194 MethodName: stream.Method(), 1195 PeerAddr: nil, 1196 } 1197 if deadline, ok := ctx.Deadline(); ok { 1198 logEntry.Timeout = time.Until(deadline) 1199 if logEntry.Timeout < 0 { 1200 logEntry.Timeout = 0 1201 } 1202 } 1203 if a := md[":authority"]; len(a) > 0 { 1204 logEntry.Authority = a[0] 1205 } 1206 if peer, ok := peer.FromContext(ctx); ok { 1207 logEntry.PeerAddr = peer.Addr 1208 } 1209 binlog.Log(logEntry) 1210 } 1211 1212 // comp and cp are used for compression. decomp and dc are used for 1213 // decompression. If comp and decomp are both set, they are the same; 1214 // however they are kept separate to ensure that at most one of the 1215 // compressor/decompressor variable pairs are set for use later. 1216 var comp, decomp encoding.Compressor 1217 var cp Compressor 1218 var dc Decompressor 1219 1220 // If dc is set and matches the stream's compression, use it. Otherwise, try 1221 // to find a matching registered compressor for decomp. 1222 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1223 dc = s.opts.dc 1224 } else if rc != "" && rc != encoding.Identity { 1225 decomp = encoding.GetCompressor(rc) 1226 if decomp == nil { 1227 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1228 t.WriteStatus(stream, st) 1229 return st.Err() 1230 } 1231 } 1232 1233 // If cp is set, use it. Otherwise, attempt to compress the response using 1234 // the incoming message compression method. 1235 // 1236 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1237 if s.opts.cp != nil { 1238 cp = s.opts.cp 1239 stream.SetSendCompress(cp.Type()) 1240 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1241 // Legacy compressor not specified; attempt to respond with same encoding. 1242 comp = encoding.GetCompressor(rc) 1243 if comp != nil { 1244 stream.SetSendCompress(rc) 1245 } 1246 } 1247 1248 var payInfo *payloadInfo 1249 if sh != nil || binlog != nil { 1250 payInfo = &payloadInfo{} 1251 } 1252 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1253 if err != nil { 1254 if e := t.WriteStatus(stream, status.Convert(err)); e != nil { 1255 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) 1256 } 1257 return err 1258 } 1259 if channelz.IsOn() { 1260 t.IncrMsgRecv() 1261 } 1262 df := func(v interface{}) error { 1263 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1264 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1265 } 1266 if sh != nil { 1267 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1268 RecvTime: time.Now(), 1269 Payload: v, 1270 WireLength: payInfo.wireLength + headerLen, 1271 Data: d, 1272 Length: len(d), 1273 }) 1274 } 1275 if binlog != nil { 1276 binlog.Log(&binarylog.ClientMessage{ 1277 Message: d, 1278 }) 1279 } 1280 if trInfo != nil { 1281 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1282 } 1283 return nil 1284 } 1285 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1286 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) 1287 if appErr != nil { 1288 appStatus, ok := status.FromError(appErr) 1289 if !ok { 1290 // Convert appErr if it is not a grpc status error. 1291 appErr = status.Error(codes.Unknown, appErr.Error()) 1292 appStatus, _ = status.FromError(appErr) 1293 } 1294 if trInfo != nil { 1295 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1296 trInfo.tr.SetError() 1297 } 1298 if e := t.WriteStatus(stream, appStatus); e != nil { 1299 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1300 } 1301 if binlog != nil { 1302 if h, _ := stream.Header(); h.Len() > 0 { 1303 // Only log serverHeader if there was header. Otherwise it can 1304 // be trailer only. 1305 binlog.Log(&binarylog.ServerHeader{ 1306 Header: h, 1307 }) 1308 } 1309 binlog.Log(&binarylog.ServerTrailer{ 1310 Trailer: stream.Trailer(), 1311 Err: appErr, 1312 }) 1313 } 1314 return appErr 1315 } 1316 if trInfo != nil { 1317 trInfo.tr.LazyLog(stringer("OK"), false) 1318 } 1319 opts := &transport.Options{Last: true} 1320 1321 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1322 if err == io.EOF { 1323 // The entire stream is done (for unary RPC only). 1324 return err 1325 } 1326 if sts, ok := status.FromError(err); ok { 1327 if e := t.WriteStatus(stream, sts); e != nil { 1328 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1329 } 1330 } else { 1331 switch st := err.(type) { 1332 case transport.ConnectionError: 1333 // Nothing to do here. 1334 default: 1335 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1336 } 1337 } 1338 if binlog != nil { 1339 h, _ := stream.Header() 1340 binlog.Log(&binarylog.ServerHeader{ 1341 Header: h, 1342 }) 1343 binlog.Log(&binarylog.ServerTrailer{ 1344 Trailer: stream.Trailer(), 1345 Err: appErr, 1346 }) 1347 } 1348 return err 1349 } 1350 if binlog != nil { 1351 h, _ := stream.Header() 1352 binlog.Log(&binarylog.ServerHeader{ 1353 Header: h, 1354 }) 1355 binlog.Log(&binarylog.ServerMessage{ 1356 Message: reply, 1357 }) 1358 } 1359 if channelz.IsOn() { 1360 t.IncrMsgSent() 1361 } 1362 if trInfo != nil { 1363 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1364 } 1365 // TODO: Should we be logging if writing status failed here, like above? 1366 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1367 // error or allow the stats handler to see it? 1368 err = t.WriteStatus(stream, statusOK) 1369 if binlog != nil { 1370 binlog.Log(&binarylog.ServerTrailer{ 1371 Trailer: stream.Trailer(), 1372 Err: appErr, 1373 }) 1374 } 1375 return err 1376} 1377 1378// chainStreamServerInterceptors chains all stream server interceptors into one. 1379func chainStreamServerInterceptors(s *Server) { 1380 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1381 // be executed before any other chained interceptors. 1382 interceptors := s.opts.chainStreamInts 1383 if s.opts.streamInt != nil { 1384 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1385 } 1386 1387 var chainedInt StreamServerInterceptor 1388 if len(interceptors) == 0 { 1389 chainedInt = nil 1390 } else if len(interceptors) == 1 { 1391 chainedInt = interceptors[0] 1392 } else { 1393 chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1394 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) 1395 } 1396 } 1397 1398 s.opts.streamInt = chainedInt 1399} 1400 1401// getChainStreamHandler recursively generate the chained StreamHandler 1402func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { 1403 if curr == len(interceptors)-1 { 1404 return finalHandler 1405 } 1406 1407 return func(srv interface{}, ss ServerStream) error { 1408 return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) 1409 } 1410} 1411 1412func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { 1413 if channelz.IsOn() { 1414 s.incrCallsStarted() 1415 } 1416 sh := s.opts.statsHandler 1417 var statsBegin *stats.Begin 1418 if sh != nil { 1419 beginTime := time.Now() 1420 statsBegin = &stats.Begin{ 1421 BeginTime: beginTime, 1422 } 1423 sh.HandleRPC(stream.Context(), statsBegin) 1424 } 1425 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1426 ss := &serverStream{ 1427 ctx: ctx, 1428 t: t, 1429 s: stream, 1430 p: &parser{r: stream}, 1431 codec: s.getCodec(stream.ContentSubtype()), 1432 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1433 maxSendMessageSize: s.opts.maxSendMessageSize, 1434 trInfo: trInfo, 1435 statsHandler: sh, 1436 } 1437 1438 if sh != nil || trInfo != nil || channelz.IsOn() { 1439 // See comment in processUnaryRPC on defers. 1440 defer func() { 1441 if trInfo != nil { 1442 ss.mu.Lock() 1443 if err != nil && err != io.EOF { 1444 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1445 ss.trInfo.tr.SetError() 1446 } 1447 ss.trInfo.tr.Finish() 1448 ss.trInfo.tr = nil 1449 ss.mu.Unlock() 1450 } 1451 1452 if sh != nil { 1453 end := &stats.End{ 1454 BeginTime: statsBegin.BeginTime, 1455 EndTime: time.Now(), 1456 } 1457 if err != nil && err != io.EOF { 1458 end.Error = toRPCErr(err) 1459 } 1460 sh.HandleRPC(stream.Context(), end) 1461 } 1462 1463 if channelz.IsOn() { 1464 if err != nil && err != io.EOF { 1465 s.incrCallsFailed() 1466 } else { 1467 s.incrCallsSucceeded() 1468 } 1469 } 1470 }() 1471 } 1472 1473 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1474 if ss.binlog != nil { 1475 md, _ := metadata.FromIncomingContext(ctx) 1476 logEntry := &binarylog.ClientHeader{ 1477 Header: md, 1478 MethodName: stream.Method(), 1479 PeerAddr: nil, 1480 } 1481 if deadline, ok := ctx.Deadline(); ok { 1482 logEntry.Timeout = time.Until(deadline) 1483 if logEntry.Timeout < 0 { 1484 logEntry.Timeout = 0 1485 } 1486 } 1487 if a := md[":authority"]; len(a) > 0 { 1488 logEntry.Authority = a[0] 1489 } 1490 if peer, ok := peer.FromContext(ss.Context()); ok { 1491 logEntry.PeerAddr = peer.Addr 1492 } 1493 ss.binlog.Log(logEntry) 1494 } 1495 1496 // If dc is set and matches the stream's compression, use it. Otherwise, try 1497 // to find a matching registered compressor for decomp. 1498 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1499 ss.dc = s.opts.dc 1500 } else if rc != "" && rc != encoding.Identity { 1501 ss.decomp = encoding.GetCompressor(rc) 1502 if ss.decomp == nil { 1503 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1504 t.WriteStatus(ss.s, st) 1505 return st.Err() 1506 } 1507 } 1508 1509 // If cp is set, use it. Otherwise, attempt to compress the response using 1510 // the incoming message compression method. 1511 // 1512 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1513 if s.opts.cp != nil { 1514 ss.cp = s.opts.cp 1515 stream.SetSendCompress(s.opts.cp.Type()) 1516 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1517 // Legacy compressor not specified; attempt to respond with same encoding. 1518 ss.comp = encoding.GetCompressor(rc) 1519 if ss.comp != nil { 1520 stream.SetSendCompress(rc) 1521 } 1522 } 1523 1524 if trInfo != nil { 1525 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1526 } 1527 var appErr error 1528 var server interface{} 1529 if info != nil { 1530 server = info.serviceImpl 1531 } 1532 if s.opts.streamInt == nil { 1533 appErr = sd.Handler(server, ss) 1534 } else { 1535 info := &StreamServerInfo{ 1536 FullMethod: stream.Method(), 1537 IsClientStream: sd.ClientStreams, 1538 IsServerStream: sd.ServerStreams, 1539 } 1540 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1541 } 1542 if appErr != nil { 1543 appStatus, ok := status.FromError(appErr) 1544 if !ok { 1545 appStatus = status.New(codes.Unknown, appErr.Error()) 1546 appErr = appStatus.Err() 1547 } 1548 if trInfo != nil { 1549 ss.mu.Lock() 1550 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1551 ss.trInfo.tr.SetError() 1552 ss.mu.Unlock() 1553 } 1554 t.WriteStatus(ss.s, appStatus) 1555 if ss.binlog != nil { 1556 ss.binlog.Log(&binarylog.ServerTrailer{ 1557 Trailer: ss.s.Trailer(), 1558 Err: appErr, 1559 }) 1560 } 1561 // TODO: Should we log an error from WriteStatus here and below? 1562 return appErr 1563 } 1564 if trInfo != nil { 1565 ss.mu.Lock() 1566 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1567 ss.mu.Unlock() 1568 } 1569 err = t.WriteStatus(ss.s, statusOK) 1570 if ss.binlog != nil { 1571 ss.binlog.Log(&binarylog.ServerTrailer{ 1572 Trailer: ss.s.Trailer(), 1573 Err: appErr, 1574 }) 1575 } 1576 return err 1577} 1578 1579func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1580 sm := stream.Method() 1581 if sm != "" && sm[0] == '/' { 1582 sm = sm[1:] 1583 } 1584 pos := strings.LastIndex(sm, "/") 1585 if pos == -1 { 1586 if trInfo != nil { 1587 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1588 trInfo.tr.SetError() 1589 } 1590 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1591 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1592 if trInfo != nil { 1593 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1594 trInfo.tr.SetError() 1595 } 1596 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1597 } 1598 if trInfo != nil { 1599 trInfo.tr.Finish() 1600 } 1601 return 1602 } 1603 service := sm[:pos] 1604 method := sm[pos+1:] 1605 1606 srv, knownService := s.services[service] 1607 if knownService { 1608 if md, ok := srv.methods[method]; ok { 1609 s.processUnaryRPC(t, stream, srv, md, trInfo) 1610 return 1611 } 1612 if sd, ok := srv.streams[method]; ok { 1613 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1614 return 1615 } 1616 } 1617 // Unknown service, or known server unknown method. 1618 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1619 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1620 return 1621 } 1622 var errDesc string 1623 if !knownService { 1624 errDesc = fmt.Sprintf("unknown service %v", service) 1625 } else { 1626 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1627 } 1628 if trInfo != nil { 1629 trInfo.tr.LazyPrintf("%s", errDesc) 1630 trInfo.tr.SetError() 1631 } 1632 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1633 if trInfo != nil { 1634 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1635 trInfo.tr.SetError() 1636 } 1637 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1638 } 1639 if trInfo != nil { 1640 trInfo.tr.Finish() 1641 } 1642} 1643 1644// The key to save ServerTransportStream in the context. 1645type streamKey struct{} 1646 1647// NewContextWithServerTransportStream creates a new context from ctx and 1648// attaches stream to it. 1649// 1650// Experimental 1651// 1652// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1653// later release. 1654func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1655 return context.WithValue(ctx, streamKey{}, stream) 1656} 1657 1658// ServerTransportStream is a minimal interface that a transport stream must 1659// implement. This can be used to mock an actual transport stream for tests of 1660// handler code that use, for example, grpc.SetHeader (which requires some 1661// stream to be in context). 1662// 1663// See also NewContextWithServerTransportStream. 1664// 1665// Experimental 1666// 1667// Notice: This type is EXPERIMENTAL and may be changed or removed in a 1668// later release. 1669type ServerTransportStream interface { 1670 Method() string 1671 SetHeader(md metadata.MD) error 1672 SendHeader(md metadata.MD) error 1673 SetTrailer(md metadata.MD) error 1674} 1675 1676// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1677// ctx. Returns nil if the given context has no stream associated with it 1678// (which implies it is not an RPC invocation context). 1679// 1680// Experimental 1681// 1682// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1683// later release. 1684func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1685 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1686 return s 1687} 1688 1689// Stop stops the gRPC server. It immediately closes all open 1690// connections and listeners. 1691// It cancels all active RPCs on the server side and the corresponding 1692// pending RPCs on the client side will get notified by connection 1693// errors. 1694func (s *Server) Stop() { 1695 s.quit.Fire() 1696 1697 defer func() { 1698 s.serveWG.Wait() 1699 s.done.Fire() 1700 }() 1701 1702 s.channelzRemoveOnce.Do(func() { 1703 if channelz.IsOn() { 1704 channelz.RemoveEntry(s.channelzID) 1705 } 1706 }) 1707 1708 s.mu.Lock() 1709 listeners := s.lis 1710 s.lis = nil 1711 conns := s.conns 1712 s.conns = nil 1713 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1714 s.cv.Broadcast() 1715 s.mu.Unlock() 1716 1717 for lis := range listeners { 1718 lis.Close() 1719 } 1720 for _, cs := range conns { 1721 for st := range cs { 1722 st.Close() 1723 } 1724 } 1725 if s.opts.numServerWorkers > 0 { 1726 s.stopServerWorkers() 1727 } 1728 1729 s.mu.Lock() 1730 if s.events != nil { 1731 s.events.Finish() 1732 s.events = nil 1733 } 1734 s.mu.Unlock() 1735} 1736 1737// GracefulStop stops the gRPC server gracefully. It stops the server from 1738// accepting new connections and RPCs and blocks until all the pending RPCs are 1739// finished. 1740func (s *Server) GracefulStop() { 1741 s.quit.Fire() 1742 defer s.done.Fire() 1743 1744 s.channelzRemoveOnce.Do(func() { 1745 if channelz.IsOn() { 1746 channelz.RemoveEntry(s.channelzID) 1747 } 1748 }) 1749 s.mu.Lock() 1750 if s.conns == nil { 1751 s.mu.Unlock() 1752 return 1753 } 1754 1755 for lis := range s.lis { 1756 lis.Close() 1757 } 1758 s.lis = nil 1759 if !s.drain { 1760 for _, conns := range s.conns { 1761 for st := range conns { 1762 st.Drain() 1763 } 1764 } 1765 s.drain = true 1766 } 1767 1768 // Wait for serving threads to be ready to exit. Only then can we be sure no 1769 // new conns will be created. 1770 s.mu.Unlock() 1771 s.serveWG.Wait() 1772 s.mu.Lock() 1773 1774 for len(s.conns) != 0 { 1775 s.cv.Wait() 1776 } 1777 s.conns = nil 1778 if s.events != nil { 1779 s.events.Finish() 1780 s.events = nil 1781 } 1782 s.mu.Unlock() 1783} 1784 1785// contentSubtype must be lowercase 1786// cannot return nil 1787func (s *Server) getCodec(contentSubtype string) baseCodec { 1788 if s.opts.codec != nil { 1789 return s.opts.codec 1790 } 1791 if contentSubtype == "" { 1792 return encoding.GetCodec(proto.Name) 1793 } 1794 codec := encoding.GetCodec(contentSubtype) 1795 if codec == nil { 1796 return encoding.GetCodec(proto.Name) 1797 } 1798 return codec 1799} 1800 1801// SetHeader sets the header metadata. 1802// When called multiple times, all the provided metadata will be merged. 1803// All the metadata will be sent out when one of the following happens: 1804// - grpc.SendHeader() is called; 1805// - The first response is sent out; 1806// - An RPC status is sent out (error or success). 1807func SetHeader(ctx context.Context, md metadata.MD) error { 1808 if md.Len() == 0 { 1809 return nil 1810 } 1811 stream := ServerTransportStreamFromContext(ctx) 1812 if stream == nil { 1813 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1814 } 1815 return stream.SetHeader(md) 1816} 1817 1818// SendHeader sends header metadata. It may be called at most once. 1819// The provided md and headers set by SetHeader() will be sent. 1820func SendHeader(ctx context.Context, md metadata.MD) error { 1821 stream := ServerTransportStreamFromContext(ctx) 1822 if stream == nil { 1823 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1824 } 1825 if err := stream.SendHeader(md); err != nil { 1826 return toRPCErr(err) 1827 } 1828 return nil 1829} 1830 1831// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1832// When called more than once, all the provided metadata will be merged. 1833func SetTrailer(ctx context.Context, md metadata.MD) error { 1834 if md.Len() == 0 { 1835 return nil 1836 } 1837 stream := ServerTransportStreamFromContext(ctx) 1838 if stream == nil { 1839 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1840 } 1841 return stream.SetTrailer(md) 1842} 1843 1844// Method returns the method string for the server context. The returned 1845// string is in the format of "/service/method". 1846func Method(ctx context.Context) (string, bool) { 1847 s := ServerTransportStreamFromContext(ctx) 1848 if s == nil { 1849 return "", false 1850 } 1851 return s.Method(), true 1852} 1853 1854type channelzServer struct { 1855 s *Server 1856} 1857 1858func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1859 return c.s.channelzMetric() 1860} 1861