1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal/binarylog" 44 "google.golang.org/grpc/internal/channelz" 45 "google.golang.org/grpc/internal/grpcrand" 46 "google.golang.org/grpc/internal/grpcsync" 47 "google.golang.org/grpc/internal/transport" 48 "google.golang.org/grpc/keepalive" 49 "google.golang.org/grpc/metadata" 50 "google.golang.org/grpc/peer" 51 "google.golang.org/grpc/stats" 52 "google.golang.org/grpc/status" 53 "google.golang.org/grpc/tap" 54) 55 56const ( 57 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 58 defaultServerMaxSendMessageSize = math.MaxInt32 59) 60 61var statusOK = status.New(codes.OK, "") 62 63type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 64 65// MethodDesc represents an RPC service's method specification. 66type MethodDesc struct { 67 MethodName string 68 Handler methodHandler 69} 70 71// ServiceDesc represents an RPC service's specification. 72type ServiceDesc struct { 73 ServiceName string 74 // The pointer to the service interface. Used to check whether the user 75 // provided implementation satisfies the interface requirements. 76 HandlerType interface{} 77 Methods []MethodDesc 78 Streams []StreamDesc 79 Metadata interface{} 80} 81 82// service consists of the information of the server serving this service and 83// the methods in this service. 84type service struct { 85 server interface{} // the server for service methods 86 md map[string]*MethodDesc 87 sd map[string]*StreamDesc 88 mdata interface{} 89} 90 91type serverWorkerData struct { 92 st transport.ServerTransport 93 wg *sync.WaitGroup 94 stream *transport.Stream 95} 96 97// Server is a gRPC server to serve RPC requests. 98type Server struct { 99 opts serverOptions 100 101 mu sync.Mutex // guards following 102 lis map[net.Listener]bool 103 conns map[transport.ServerTransport]bool 104 serve bool 105 drain bool 106 cv *sync.Cond // signaled when connections close for GracefulStop 107 m map[string]*service // service name -> service info 108 events trace.EventLog 109 110 quit *grpcsync.Event 111 done *grpcsync.Event 112 channelzRemoveOnce sync.Once 113 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 114 115 channelzID int64 // channelz unique identification number 116 czData *channelzData 117 118 serverWorkerChannels []chan *serverWorkerData 119} 120 121type serverOptions struct { 122 creds credentials.TransportCredentials 123 codec baseCodec 124 cp Compressor 125 dc Decompressor 126 unaryInt UnaryServerInterceptor 127 streamInt StreamServerInterceptor 128 chainUnaryInts []UnaryServerInterceptor 129 chainStreamInts []StreamServerInterceptor 130 inTapHandle tap.ServerInHandle 131 statsHandler stats.Handler 132 maxConcurrentStreams uint32 133 maxReceiveMessageSize int 134 maxSendMessageSize int 135 unknownStreamDesc *StreamDesc 136 keepaliveParams keepalive.ServerParameters 137 keepalivePolicy keepalive.EnforcementPolicy 138 initialWindowSize int32 139 initialConnWindowSize int32 140 writeBufferSize int 141 readBufferSize int 142 connectionTimeout time.Duration 143 maxHeaderListSize *uint32 144 headerTableSize *uint32 145 numServerWorkers uint32 146} 147 148var defaultServerOptions = serverOptions{ 149 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 150 maxSendMessageSize: defaultServerMaxSendMessageSize, 151 connectionTimeout: 120 * time.Second, 152 writeBufferSize: defaultWriteBufSize, 153 readBufferSize: defaultReadBufSize, 154} 155 156// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 157type ServerOption interface { 158 apply(*serverOptions) 159} 160 161// EmptyServerOption does not alter the server configuration. It can be embedded 162// in another structure to build custom server options. 163// 164// This API is EXPERIMENTAL. 165type EmptyServerOption struct{} 166 167func (EmptyServerOption) apply(*serverOptions) {} 168 169// funcServerOption wraps a function that modifies serverOptions into an 170// implementation of the ServerOption interface. 171type funcServerOption struct { 172 f func(*serverOptions) 173} 174 175func (fdo *funcServerOption) apply(do *serverOptions) { 176 fdo.f(do) 177} 178 179func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 180 return &funcServerOption{ 181 f: f, 182 } 183} 184 185// WriteBufferSize determines how much data can be batched before doing a write on the wire. 186// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. 187// The default value for this buffer is 32KB. 188// Zero will disable the write buffer such that each write will be on underlying connection. 189// Note: A Send call may not directly translate to a write. 190func WriteBufferSize(s int) ServerOption { 191 return newFuncServerOption(func(o *serverOptions) { 192 o.writeBufferSize = s 193 }) 194} 195 196// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 197// for one read syscall. 198// The default value for this buffer is 32KB. 199// Zero will disable read buffer for a connection so data framer can access the underlying 200// conn directly. 201func ReadBufferSize(s int) ServerOption { 202 return newFuncServerOption(func(o *serverOptions) { 203 o.readBufferSize = s 204 }) 205} 206 207// InitialWindowSize returns a ServerOption that sets window size for stream. 208// The lower bound for window size is 64K and any value smaller than that will be ignored. 209func InitialWindowSize(s int32) ServerOption { 210 return newFuncServerOption(func(o *serverOptions) { 211 o.initialWindowSize = s 212 }) 213} 214 215// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 216// The lower bound for window size is 64K and any value smaller than that will be ignored. 217func InitialConnWindowSize(s int32) ServerOption { 218 return newFuncServerOption(func(o *serverOptions) { 219 o.initialConnWindowSize = s 220 }) 221} 222 223// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 224func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 225 if kp.Time > 0 && kp.Time < time.Second { 226 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") 227 kp.Time = time.Second 228 } 229 230 return newFuncServerOption(func(o *serverOptions) { 231 o.keepaliveParams = kp 232 }) 233} 234 235// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 236func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 237 return newFuncServerOption(func(o *serverOptions) { 238 o.keepalivePolicy = kep 239 }) 240} 241 242// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 243// 244// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 245func CustomCodec(codec Codec) ServerOption { 246 return newFuncServerOption(func(o *serverOptions) { 247 o.codec = codec 248 }) 249} 250 251// RPCCompressor returns a ServerOption that sets a compressor for outbound 252// messages. For backward compatibility, all outbound messages will be sent 253// using this compressor, regardless of incoming message compression. By 254// default, server messages will be sent using the same compressor with which 255// request messages were sent. 256// 257// Deprecated: use encoding.RegisterCompressor instead. 258func RPCCompressor(cp Compressor) ServerOption { 259 return newFuncServerOption(func(o *serverOptions) { 260 o.cp = cp 261 }) 262} 263 264// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 265// messages. It has higher priority than decompressors registered via 266// encoding.RegisterCompressor. 267// 268// Deprecated: use encoding.RegisterCompressor instead. 269func RPCDecompressor(dc Decompressor) ServerOption { 270 return newFuncServerOption(func(o *serverOptions) { 271 o.dc = dc 272 }) 273} 274 275// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 276// If this is not set, gRPC uses the default limit. 277// 278// Deprecated: use MaxRecvMsgSize instead. 279func MaxMsgSize(m int) ServerOption { 280 return MaxRecvMsgSize(m) 281} 282 283// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 284// If this is not set, gRPC uses the default 4MB. 285func MaxRecvMsgSize(m int) ServerOption { 286 return newFuncServerOption(func(o *serverOptions) { 287 o.maxReceiveMessageSize = m 288 }) 289} 290 291// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 292// If this is not set, gRPC uses the default `math.MaxInt32`. 293func MaxSendMsgSize(m int) ServerOption { 294 return newFuncServerOption(func(o *serverOptions) { 295 o.maxSendMessageSize = m 296 }) 297} 298 299// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 300// of concurrent streams to each ServerTransport. 301func MaxConcurrentStreams(n uint32) ServerOption { 302 return newFuncServerOption(func(o *serverOptions) { 303 o.maxConcurrentStreams = n 304 }) 305} 306 307// Creds returns a ServerOption that sets credentials for server connections. 308func Creds(c credentials.TransportCredentials) ServerOption { 309 return newFuncServerOption(func(o *serverOptions) { 310 o.creds = c 311 }) 312} 313 314// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 315// server. Only one unary interceptor can be installed. The construction of multiple 316// interceptors (e.g., chaining) can be implemented at the caller. 317func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 318 return newFuncServerOption(func(o *serverOptions) { 319 if o.unaryInt != nil { 320 panic("The unary server interceptor was already set and may not be reset.") 321 } 322 o.unaryInt = i 323 }) 324} 325 326// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 327// for unary RPCs. The first interceptor will be the outer most, 328// while the last interceptor will be the inner most wrapper around the real call. 329// All unary interceptors added by this method will be chained. 330func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 331 return newFuncServerOption(func(o *serverOptions) { 332 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 333 }) 334} 335 336// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 337// server. Only one stream interceptor can be installed. 338func StreamInterceptor(i StreamServerInterceptor) ServerOption { 339 return newFuncServerOption(func(o *serverOptions) { 340 if o.streamInt != nil { 341 panic("The stream server interceptor was already set and may not be reset.") 342 } 343 o.streamInt = i 344 }) 345} 346 347// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 348// for streaming RPCs. The first interceptor will be the outer most, 349// while the last interceptor will be the inner most wrapper around the real call. 350// All stream interceptors added by this method will be chained. 351func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 352 return newFuncServerOption(func(o *serverOptions) { 353 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 354 }) 355} 356 357// InTapHandle returns a ServerOption that sets the tap handle for all the server 358// transport to be created. Only one can be installed. 359func InTapHandle(h tap.ServerInHandle) ServerOption { 360 return newFuncServerOption(func(o *serverOptions) { 361 if o.inTapHandle != nil { 362 panic("The tap handle was already set and may not be reset.") 363 } 364 o.inTapHandle = h 365 }) 366} 367 368// StatsHandler returns a ServerOption that sets the stats handler for the server. 369func StatsHandler(h stats.Handler) ServerOption { 370 return newFuncServerOption(func(o *serverOptions) { 371 o.statsHandler = h 372 }) 373} 374 375// UnknownServiceHandler returns a ServerOption that allows for adding a custom 376// unknown service handler. The provided method is a bidi-streaming RPC service 377// handler that will be invoked instead of returning the "unimplemented" gRPC 378// error whenever a request is received for an unregistered service or method. 379// The handling function and stream interceptor (if set) have full access to 380// the ServerStream, including its Context. 381func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 382 return newFuncServerOption(func(o *serverOptions) { 383 o.unknownStreamDesc = &StreamDesc{ 384 StreamName: "unknown_service_handler", 385 Handler: streamHandler, 386 // We need to assume that the users of the streamHandler will want to use both. 387 ClientStreams: true, 388 ServerStreams: true, 389 } 390 }) 391} 392 393// ConnectionTimeout returns a ServerOption that sets the timeout for 394// connection establishment (up to and including HTTP/2 handshaking) for all 395// new connections. If this is not set, the default is 120 seconds. A zero or 396// negative value will result in an immediate timeout. 397// 398// This API is EXPERIMENTAL. 399func ConnectionTimeout(d time.Duration) ServerOption { 400 return newFuncServerOption(func(o *serverOptions) { 401 o.connectionTimeout = d 402 }) 403} 404 405// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 406// of header list that the server is prepared to accept. 407func MaxHeaderListSize(s uint32) ServerOption { 408 return newFuncServerOption(func(o *serverOptions) { 409 o.maxHeaderListSize = &s 410 }) 411} 412 413// HeaderTableSize returns a ServerOption that sets the size of dynamic 414// header table for stream. 415// 416// This API is EXPERIMENTAL. 417func HeaderTableSize(s uint32) ServerOption { 418 return newFuncServerOption(func(o *serverOptions) { 419 o.headerTableSize = &s 420 }) 421} 422 423// NumStreamWorkers returns a ServerOption that sets the number of worker 424// goroutines that should be used to process incoming streams. Setting this to 425// zero (default) will disable workers and spawn a new goroutine for each 426// stream. 427// 428// This API is EXPERIMENTAL. 429func NumStreamWorkers(numServerWorkers uint32) ServerOption { 430 // TODO: If/when this API gets stabilized (i.e. stream workers become the 431 // only way streams are processed), change the behavior of the zero value to 432 // a sane default. Preliminary experiments suggest that a value equal to the 433 // number of CPUs available is most performant; requires thorough testing. 434 return newFuncServerOption(func(o *serverOptions) { 435 o.numServerWorkers = numServerWorkers 436 }) 437} 438 439// serverWorkerResetThreshold defines how often the stack must be reset. Every 440// N requests, by spawning a new goroutine in its place, a worker can reset its 441// stack so that large stacks don't live in memory forever. 2^16 should allow 442// each goroutine stack to live for at least a few seconds in a typical 443// workload (assuming a QPS of a few thousand requests/sec). 444const serverWorkerResetThreshold = 1 << 16 445 446// serverWorkers blocks on a *transport.Stream channel forever and waits for 447// data to be fed by serveStreams. This allows different requests to be 448// processed by the same goroutine, removing the need for expensive stack 449// re-allocations (see the runtime.morestack problem [1]). 450// 451// [1] https://github.com/golang/go/issues/18138 452func (s *Server) serverWorker(ch chan *serverWorkerData) { 453 // To make sure all server workers don't reset at the same time, choose a 454 // random number of iterations before resetting. 455 threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) 456 for completed := 0; completed < threshold; completed++ { 457 data, ok := <-ch 458 if !ok { 459 return 460 } 461 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) 462 data.wg.Done() 463 } 464 go s.serverWorker(ch) 465} 466 467// initServerWorkers creates worker goroutines and channels to process incoming 468// connections to reduce the time spent overall on runtime.morestack. 469func (s *Server) initServerWorkers() { 470 s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) 471 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 472 s.serverWorkerChannels[i] = make(chan *serverWorkerData) 473 go s.serverWorker(s.serverWorkerChannels[i]) 474 } 475} 476 477func (s *Server) stopServerWorkers() { 478 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 479 close(s.serverWorkerChannels[i]) 480 } 481} 482 483// NewServer creates a gRPC server which has no service registered and has not 484// started to accept requests yet. 485func NewServer(opt ...ServerOption) *Server { 486 opts := defaultServerOptions 487 for _, o := range opt { 488 o.apply(&opts) 489 } 490 s := &Server{ 491 lis: make(map[net.Listener]bool), 492 opts: opts, 493 conns: make(map[transport.ServerTransport]bool), 494 m: make(map[string]*service), 495 quit: grpcsync.NewEvent(), 496 done: grpcsync.NewEvent(), 497 czData: new(channelzData), 498 } 499 chainUnaryServerInterceptors(s) 500 chainStreamServerInterceptors(s) 501 s.cv = sync.NewCond(&s.mu) 502 if EnableTracing { 503 _, file, line, _ := runtime.Caller(1) 504 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 505 } 506 507 if s.opts.numServerWorkers > 0 { 508 s.initServerWorkers() 509 } 510 511 if channelz.IsOn() { 512 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 513 } 514 return s 515} 516 517// printf records an event in s's event log, unless s has been stopped. 518// REQUIRES s.mu is held. 519func (s *Server) printf(format string, a ...interface{}) { 520 if s.events != nil { 521 s.events.Printf(format, a...) 522 } 523} 524 525// errorf records an error in s's event log, unless s has been stopped. 526// REQUIRES s.mu is held. 527func (s *Server) errorf(format string, a ...interface{}) { 528 if s.events != nil { 529 s.events.Errorf(format, a...) 530 } 531} 532 533// RegisterService registers a service and its implementation to the gRPC 534// server. It is called from the IDL generated code. This must be called before 535// invoking Serve. 536func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 537 ht := reflect.TypeOf(sd.HandlerType).Elem() 538 st := reflect.TypeOf(ss) 539 if !st.Implements(ht) { 540 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 541 } 542 s.register(sd, ss) 543} 544 545func (s *Server) register(sd *ServiceDesc, ss interface{}) { 546 s.mu.Lock() 547 defer s.mu.Unlock() 548 s.printf("RegisterService(%q)", sd.ServiceName) 549 if s.serve { 550 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 551 } 552 if _, ok := s.m[sd.ServiceName]; ok { 553 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 554 } 555 srv := &service{ 556 server: ss, 557 md: make(map[string]*MethodDesc), 558 sd: make(map[string]*StreamDesc), 559 mdata: sd.Metadata, 560 } 561 for i := range sd.Methods { 562 d := &sd.Methods[i] 563 srv.md[d.MethodName] = d 564 } 565 for i := range sd.Streams { 566 d := &sd.Streams[i] 567 srv.sd[d.StreamName] = d 568 } 569 s.m[sd.ServiceName] = srv 570} 571 572// MethodInfo contains the information of an RPC including its method name and type. 573type MethodInfo struct { 574 // Name is the method name only, without the service name or package name. 575 Name string 576 // IsClientStream indicates whether the RPC is a client streaming RPC. 577 IsClientStream bool 578 // IsServerStream indicates whether the RPC is a server streaming RPC. 579 IsServerStream bool 580} 581 582// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 583type ServiceInfo struct { 584 Methods []MethodInfo 585 // Metadata is the metadata specified in ServiceDesc when registering service. 586 Metadata interface{} 587} 588 589// GetServiceInfo returns a map from service names to ServiceInfo. 590// Service names include the package names, in the form of <package>.<service>. 591func (s *Server) GetServiceInfo() map[string]ServiceInfo { 592 ret := make(map[string]ServiceInfo) 593 for n, srv := range s.m { 594 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 595 for m := range srv.md { 596 methods = append(methods, MethodInfo{ 597 Name: m, 598 IsClientStream: false, 599 IsServerStream: false, 600 }) 601 } 602 for m, d := range srv.sd { 603 methods = append(methods, MethodInfo{ 604 Name: m, 605 IsClientStream: d.ClientStreams, 606 IsServerStream: d.ServerStreams, 607 }) 608 } 609 610 ret[n] = ServiceInfo{ 611 Methods: methods, 612 Metadata: srv.mdata, 613 } 614 } 615 return ret 616} 617 618// ErrServerStopped indicates that the operation is now illegal because of 619// the server being stopped. 620var ErrServerStopped = errors.New("grpc: the server has been stopped") 621 622func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 623 if s.opts.creds == nil { 624 return rawConn, nil, nil 625 } 626 return s.opts.creds.ServerHandshake(rawConn) 627} 628 629type listenSocket struct { 630 net.Listener 631 channelzID int64 632} 633 634func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 635 return &channelz.SocketInternalMetric{ 636 SocketOptions: channelz.GetSocketOption(l.Listener), 637 LocalAddr: l.Listener.Addr(), 638 } 639} 640 641func (l *listenSocket) Close() error { 642 err := l.Listener.Close() 643 if channelz.IsOn() { 644 channelz.RemoveEntry(l.channelzID) 645 } 646 return err 647} 648 649// Serve accepts incoming connections on the listener lis, creating a new 650// ServerTransport and service goroutine for each. The service goroutines 651// read gRPC requests and then call the registered handlers to reply to them. 652// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 653// this method returns. 654// Serve will return a non-nil error unless Stop or GracefulStop is called. 655func (s *Server) Serve(lis net.Listener) error { 656 s.mu.Lock() 657 s.printf("serving") 658 s.serve = true 659 if s.lis == nil { 660 // Serve called after Stop or GracefulStop. 661 s.mu.Unlock() 662 lis.Close() 663 return ErrServerStopped 664 } 665 666 s.serveWG.Add(1) 667 defer func() { 668 s.serveWG.Done() 669 if s.quit.HasFired() { 670 // Stop or GracefulStop called; block until done and return nil. 671 <-s.done.Done() 672 } 673 }() 674 675 ls := &listenSocket{Listener: lis} 676 s.lis[ls] = true 677 678 if channelz.IsOn() { 679 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 680 } 681 s.mu.Unlock() 682 683 defer func() { 684 s.mu.Lock() 685 if s.lis != nil && s.lis[ls] { 686 ls.Close() 687 delete(s.lis, ls) 688 } 689 s.mu.Unlock() 690 }() 691 692 var tempDelay time.Duration // how long to sleep on accept failure 693 694 for { 695 rawConn, err := lis.Accept() 696 if err != nil { 697 if ne, ok := err.(interface { 698 Temporary() bool 699 }); ok && ne.Temporary() { 700 if tempDelay == 0 { 701 tempDelay = 5 * time.Millisecond 702 } else { 703 tempDelay *= 2 704 } 705 if max := 1 * time.Second; tempDelay > max { 706 tempDelay = max 707 } 708 s.mu.Lock() 709 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 710 s.mu.Unlock() 711 timer := time.NewTimer(tempDelay) 712 select { 713 case <-timer.C: 714 case <-s.quit.Done(): 715 timer.Stop() 716 return nil 717 } 718 continue 719 } 720 s.mu.Lock() 721 s.printf("done serving; Accept = %v", err) 722 s.mu.Unlock() 723 724 if s.quit.HasFired() { 725 return nil 726 } 727 return err 728 } 729 tempDelay = 0 730 // Start a new goroutine to deal with rawConn so we don't stall this Accept 731 // loop goroutine. 732 // 733 // Make sure we account for the goroutine so GracefulStop doesn't nil out 734 // s.conns before this conn can be added. 735 s.serveWG.Add(1) 736 go func() { 737 s.handleRawConn(rawConn) 738 s.serveWG.Done() 739 }() 740 } 741} 742 743// handleRawConn forks a goroutine to handle a just-accepted connection that 744// has not had any I/O performed on it yet. 745func (s *Server) handleRawConn(rawConn net.Conn) { 746 if s.quit.HasFired() { 747 rawConn.Close() 748 return 749 } 750 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 751 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 752 if err != nil { 753 // ErrConnDispatched means that the connection was dispatched away from 754 // gRPC; those connections should be left open. 755 if err != credentials.ErrConnDispatched { 756 s.mu.Lock() 757 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 758 s.mu.Unlock() 759 channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 760 rawConn.Close() 761 } 762 rawConn.SetDeadline(time.Time{}) 763 return 764 } 765 766 // Finish handshaking (HTTP2) 767 st := s.newHTTP2Transport(conn, authInfo) 768 if st == nil { 769 return 770 } 771 772 rawConn.SetDeadline(time.Time{}) 773 if !s.addConn(st) { 774 return 775 } 776 go func() { 777 s.serveStreams(st) 778 s.removeConn(st) 779 }() 780} 781 782// newHTTP2Transport sets up a http/2 transport (using the 783// gRPC http2 server transport in transport/http2_server.go). 784func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 785 config := &transport.ServerConfig{ 786 MaxStreams: s.opts.maxConcurrentStreams, 787 AuthInfo: authInfo, 788 InTapHandle: s.opts.inTapHandle, 789 StatsHandler: s.opts.statsHandler, 790 KeepaliveParams: s.opts.keepaliveParams, 791 KeepalivePolicy: s.opts.keepalivePolicy, 792 InitialWindowSize: s.opts.initialWindowSize, 793 InitialConnWindowSize: s.opts.initialConnWindowSize, 794 WriteBufferSize: s.opts.writeBufferSize, 795 ReadBufferSize: s.opts.readBufferSize, 796 ChannelzParentID: s.channelzID, 797 MaxHeaderListSize: s.opts.maxHeaderListSize, 798 HeaderTableSize: s.opts.headerTableSize, 799 } 800 st, err := transport.NewServerTransport("http2", c, config) 801 if err != nil { 802 s.mu.Lock() 803 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 804 s.mu.Unlock() 805 c.Close() 806 channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 807 return nil 808 } 809 810 return st 811} 812 813func (s *Server) serveStreams(st transport.ServerTransport) { 814 defer st.Close() 815 var wg sync.WaitGroup 816 817 var roundRobinCounter uint32 818 st.HandleStreams(func(stream *transport.Stream) { 819 wg.Add(1) 820 if s.opts.numServerWorkers > 0 { 821 data := &serverWorkerData{st: st, wg: &wg, stream: stream} 822 select { 823 case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: 824 default: 825 // If all stream workers are busy, fallback to the default code path. 826 go func() { 827 s.handleStream(st, stream, s.traceInfo(st, stream)) 828 wg.Done() 829 }() 830 } 831 } else { 832 go func() { 833 defer wg.Done() 834 s.handleStream(st, stream, s.traceInfo(st, stream)) 835 }() 836 } 837 }, func(ctx context.Context, method string) context.Context { 838 if !EnableTracing { 839 return ctx 840 } 841 tr := trace.New("grpc.Recv."+methodFamily(method), method) 842 return trace.NewContext(ctx, tr) 843 }) 844 wg.Wait() 845} 846 847var _ http.Handler = (*Server)(nil) 848 849// ServeHTTP implements the Go standard library's http.Handler 850// interface by responding to the gRPC request r, by looking up 851// the requested gRPC method in the gRPC server s. 852// 853// The provided HTTP request must have arrived on an HTTP/2 854// connection. When using the Go standard library's server, 855// practically this means that the Request must also have arrived 856// over TLS. 857// 858// To share one port (such as 443 for https) between gRPC and an 859// existing http.Handler, use a root http.Handler such as: 860// 861// if r.ProtoMajor == 2 && strings.HasPrefix( 862// r.Header.Get("Content-Type"), "application/grpc") { 863// grpcServer.ServeHTTP(w, r) 864// } else { 865// yourMux.ServeHTTP(w, r) 866// } 867// 868// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 869// separate from grpc-go's HTTP/2 server. Performance and features may vary 870// between the two paths. ServeHTTP does not support some gRPC features 871// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 872// and subject to change. 873func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 874 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 875 if err != nil { 876 http.Error(w, err.Error(), http.StatusInternalServerError) 877 return 878 } 879 if !s.addConn(st) { 880 return 881 } 882 defer s.removeConn(st) 883 s.serveStreams(st) 884} 885 886// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 887// If tracing is not enabled, it returns nil. 888func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 889 if !EnableTracing { 890 return nil 891 } 892 tr, ok := trace.FromContext(stream.Context()) 893 if !ok { 894 return nil 895 } 896 897 trInfo = &traceInfo{ 898 tr: tr, 899 firstLine: firstLine{ 900 client: false, 901 remoteAddr: st.RemoteAddr(), 902 }, 903 } 904 if dl, ok := stream.Context().Deadline(); ok { 905 trInfo.firstLine.deadline = time.Until(dl) 906 } 907 return trInfo 908} 909 910func (s *Server) addConn(st transport.ServerTransport) bool { 911 s.mu.Lock() 912 defer s.mu.Unlock() 913 if s.conns == nil { 914 st.Close() 915 return false 916 } 917 if s.drain { 918 // Transport added after we drained our existing conns: drain it 919 // immediately. 920 st.Drain() 921 } 922 s.conns[st] = true 923 return true 924} 925 926func (s *Server) removeConn(st transport.ServerTransport) { 927 s.mu.Lock() 928 defer s.mu.Unlock() 929 if s.conns != nil { 930 delete(s.conns, st) 931 s.cv.Broadcast() 932 } 933} 934 935func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 936 return &channelz.ServerInternalMetric{ 937 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 938 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 939 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 940 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 941 } 942} 943 944func (s *Server) incrCallsStarted() { 945 atomic.AddInt64(&s.czData.callsStarted, 1) 946 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 947} 948 949func (s *Server) incrCallsSucceeded() { 950 atomic.AddInt64(&s.czData.callsSucceeded, 1) 951} 952 953func (s *Server) incrCallsFailed() { 954 atomic.AddInt64(&s.czData.callsFailed, 1) 955} 956 957func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 958 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 959 if err != nil { 960 channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err) 961 return err 962 } 963 compData, err := compress(data, cp, comp) 964 if err != nil { 965 channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err) 966 return err 967 } 968 hdr, payload := msgHeader(data, compData) 969 // TODO(dfawley): should we be checking len(data) instead? 970 if len(payload) > s.opts.maxSendMessageSize { 971 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 972 } 973 err = t.Write(stream, hdr, payload, opts) 974 if err == nil && s.opts.statsHandler != nil { 975 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 976 } 977 return err 978} 979 980// chainUnaryServerInterceptors chains all unary server interceptors into one. 981func chainUnaryServerInterceptors(s *Server) { 982 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 983 // be executed before any other chained interceptors. 984 interceptors := s.opts.chainUnaryInts 985 if s.opts.unaryInt != nil { 986 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 987 } 988 989 var chainedInt UnaryServerInterceptor 990 if len(interceptors) == 0 { 991 chainedInt = nil 992 } else if len(interceptors) == 1 { 993 chainedInt = interceptors[0] 994 } else { 995 chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 996 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) 997 } 998 } 999 1000 s.opts.unaryInt = chainedInt 1001} 1002 1003// getChainUnaryHandler recursively generate the chained UnaryHandler 1004func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { 1005 if curr == len(interceptors)-1 { 1006 return finalHandler 1007 } 1008 1009 return func(ctx context.Context, req interface{}) (interface{}, error) { 1010 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) 1011 } 1012} 1013 1014func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 1015 sh := s.opts.statsHandler 1016 if sh != nil || trInfo != nil || channelz.IsOn() { 1017 if channelz.IsOn() { 1018 s.incrCallsStarted() 1019 } 1020 var statsBegin *stats.Begin 1021 if sh != nil { 1022 beginTime := time.Now() 1023 statsBegin = &stats.Begin{ 1024 BeginTime: beginTime, 1025 } 1026 sh.HandleRPC(stream.Context(), statsBegin) 1027 } 1028 if trInfo != nil { 1029 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1030 } 1031 // The deferred error handling for tracing, stats handler and channelz are 1032 // combined into one function to reduce stack usage -- a defer takes ~56-64 1033 // bytes on the stack, so overflowing the stack will require a stack 1034 // re-allocation, which is expensive. 1035 // 1036 // To maintain behavior similar to separate deferred statements, statements 1037 // should be executed in the reverse order. That is, tracing first, stats 1038 // handler second, and channelz last. Note that panics *within* defers will 1039 // lead to different behavior, but that's an acceptable compromise; that 1040 // would be undefined behavior territory anyway. 1041 defer func() { 1042 if trInfo != nil { 1043 if err != nil && err != io.EOF { 1044 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1045 trInfo.tr.SetError() 1046 } 1047 trInfo.tr.Finish() 1048 } 1049 1050 if sh != nil { 1051 end := &stats.End{ 1052 BeginTime: statsBegin.BeginTime, 1053 EndTime: time.Now(), 1054 } 1055 if err != nil && err != io.EOF { 1056 end.Error = toRPCErr(err) 1057 } 1058 sh.HandleRPC(stream.Context(), end) 1059 } 1060 1061 if channelz.IsOn() { 1062 if err != nil && err != io.EOF { 1063 s.incrCallsFailed() 1064 } else { 1065 s.incrCallsSucceeded() 1066 } 1067 } 1068 }() 1069 } 1070 1071 binlog := binarylog.GetMethodLogger(stream.Method()) 1072 if binlog != nil { 1073 ctx := stream.Context() 1074 md, _ := metadata.FromIncomingContext(ctx) 1075 logEntry := &binarylog.ClientHeader{ 1076 Header: md, 1077 MethodName: stream.Method(), 1078 PeerAddr: nil, 1079 } 1080 if deadline, ok := ctx.Deadline(); ok { 1081 logEntry.Timeout = time.Until(deadline) 1082 if logEntry.Timeout < 0 { 1083 logEntry.Timeout = 0 1084 } 1085 } 1086 if a := md[":authority"]; len(a) > 0 { 1087 logEntry.Authority = a[0] 1088 } 1089 if peer, ok := peer.FromContext(ctx); ok { 1090 logEntry.PeerAddr = peer.Addr 1091 } 1092 binlog.Log(logEntry) 1093 } 1094 1095 // comp and cp are used for compression. decomp and dc are used for 1096 // decompression. If comp and decomp are both set, they are the same; 1097 // however they are kept separate to ensure that at most one of the 1098 // compressor/decompressor variable pairs are set for use later. 1099 var comp, decomp encoding.Compressor 1100 var cp Compressor 1101 var dc Decompressor 1102 1103 // If dc is set and matches the stream's compression, use it. Otherwise, try 1104 // to find a matching registered compressor for decomp. 1105 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1106 dc = s.opts.dc 1107 } else if rc != "" && rc != encoding.Identity { 1108 decomp = encoding.GetCompressor(rc) 1109 if decomp == nil { 1110 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1111 t.WriteStatus(stream, st) 1112 return st.Err() 1113 } 1114 } 1115 1116 // If cp is set, use it. Otherwise, attempt to compress the response using 1117 // the incoming message compression method. 1118 // 1119 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1120 if s.opts.cp != nil { 1121 cp = s.opts.cp 1122 stream.SetSendCompress(cp.Type()) 1123 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1124 // Legacy compressor not specified; attempt to respond with same encoding. 1125 comp = encoding.GetCompressor(rc) 1126 if comp != nil { 1127 stream.SetSendCompress(rc) 1128 } 1129 } 1130 1131 var payInfo *payloadInfo 1132 if sh != nil || binlog != nil { 1133 payInfo = &payloadInfo{} 1134 } 1135 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1136 if err != nil { 1137 if st, ok := status.FromError(err); ok { 1138 if e := t.WriteStatus(stream, st); e != nil { 1139 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) 1140 } 1141 } 1142 return err 1143 } 1144 if channelz.IsOn() { 1145 t.IncrMsgRecv() 1146 } 1147 df := func(v interface{}) error { 1148 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1149 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1150 } 1151 if sh != nil { 1152 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1153 RecvTime: time.Now(), 1154 Payload: v, 1155 WireLength: payInfo.wireLength, 1156 Data: d, 1157 Length: len(d), 1158 }) 1159 } 1160 if binlog != nil { 1161 binlog.Log(&binarylog.ClientMessage{ 1162 Message: d, 1163 }) 1164 } 1165 if trInfo != nil { 1166 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1167 } 1168 return nil 1169 } 1170 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1171 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 1172 if appErr != nil { 1173 appStatus, ok := status.FromError(appErr) 1174 if !ok { 1175 // Convert appErr if it is not a grpc status error. 1176 appErr = status.Error(codes.Unknown, appErr.Error()) 1177 appStatus, _ = status.FromError(appErr) 1178 } 1179 if trInfo != nil { 1180 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1181 trInfo.tr.SetError() 1182 } 1183 if e := t.WriteStatus(stream, appStatus); e != nil { 1184 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1185 } 1186 if binlog != nil { 1187 if h, _ := stream.Header(); h.Len() > 0 { 1188 // Only log serverHeader if there was header. Otherwise it can 1189 // be trailer only. 1190 binlog.Log(&binarylog.ServerHeader{ 1191 Header: h, 1192 }) 1193 } 1194 binlog.Log(&binarylog.ServerTrailer{ 1195 Trailer: stream.Trailer(), 1196 Err: appErr, 1197 }) 1198 } 1199 return appErr 1200 } 1201 if trInfo != nil { 1202 trInfo.tr.LazyLog(stringer("OK"), false) 1203 } 1204 opts := &transport.Options{Last: true} 1205 1206 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1207 if err == io.EOF { 1208 // The entire stream is done (for unary RPC only). 1209 return err 1210 } 1211 if sts, ok := status.FromError(err); ok { 1212 if e := t.WriteStatus(stream, sts); e != nil { 1213 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1214 } 1215 } else { 1216 switch st := err.(type) { 1217 case transport.ConnectionError: 1218 // Nothing to do here. 1219 default: 1220 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1221 } 1222 } 1223 if binlog != nil { 1224 h, _ := stream.Header() 1225 binlog.Log(&binarylog.ServerHeader{ 1226 Header: h, 1227 }) 1228 binlog.Log(&binarylog.ServerTrailer{ 1229 Trailer: stream.Trailer(), 1230 Err: appErr, 1231 }) 1232 } 1233 return err 1234 } 1235 if binlog != nil { 1236 h, _ := stream.Header() 1237 binlog.Log(&binarylog.ServerHeader{ 1238 Header: h, 1239 }) 1240 binlog.Log(&binarylog.ServerMessage{ 1241 Message: reply, 1242 }) 1243 } 1244 if channelz.IsOn() { 1245 t.IncrMsgSent() 1246 } 1247 if trInfo != nil { 1248 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1249 } 1250 // TODO: Should we be logging if writing status failed here, like above? 1251 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1252 // error or allow the stats handler to see it? 1253 err = t.WriteStatus(stream, statusOK) 1254 if binlog != nil { 1255 binlog.Log(&binarylog.ServerTrailer{ 1256 Trailer: stream.Trailer(), 1257 Err: appErr, 1258 }) 1259 } 1260 return err 1261} 1262 1263// chainStreamServerInterceptors chains all stream server interceptors into one. 1264func chainStreamServerInterceptors(s *Server) { 1265 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1266 // be executed before any other chained interceptors. 1267 interceptors := s.opts.chainStreamInts 1268 if s.opts.streamInt != nil { 1269 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1270 } 1271 1272 var chainedInt StreamServerInterceptor 1273 if len(interceptors) == 0 { 1274 chainedInt = nil 1275 } else if len(interceptors) == 1 { 1276 chainedInt = interceptors[0] 1277 } else { 1278 chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1279 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) 1280 } 1281 } 1282 1283 s.opts.streamInt = chainedInt 1284} 1285 1286// getChainStreamHandler recursively generate the chained StreamHandler 1287func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { 1288 if curr == len(interceptors)-1 { 1289 return finalHandler 1290 } 1291 1292 return func(srv interface{}, ss ServerStream) error { 1293 return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) 1294 } 1295} 1296 1297func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1298 if channelz.IsOn() { 1299 s.incrCallsStarted() 1300 } 1301 sh := s.opts.statsHandler 1302 var statsBegin *stats.Begin 1303 if sh != nil { 1304 beginTime := time.Now() 1305 statsBegin = &stats.Begin{ 1306 BeginTime: beginTime, 1307 } 1308 sh.HandleRPC(stream.Context(), statsBegin) 1309 } 1310 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1311 ss := &serverStream{ 1312 ctx: ctx, 1313 t: t, 1314 s: stream, 1315 p: &parser{r: stream}, 1316 codec: s.getCodec(stream.ContentSubtype()), 1317 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1318 maxSendMessageSize: s.opts.maxSendMessageSize, 1319 trInfo: trInfo, 1320 statsHandler: sh, 1321 } 1322 1323 if sh != nil || trInfo != nil || channelz.IsOn() { 1324 // See comment in processUnaryRPC on defers. 1325 defer func() { 1326 if trInfo != nil { 1327 ss.mu.Lock() 1328 if err != nil && err != io.EOF { 1329 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1330 ss.trInfo.tr.SetError() 1331 } 1332 ss.trInfo.tr.Finish() 1333 ss.trInfo.tr = nil 1334 ss.mu.Unlock() 1335 } 1336 1337 if sh != nil { 1338 end := &stats.End{ 1339 BeginTime: statsBegin.BeginTime, 1340 EndTime: time.Now(), 1341 } 1342 if err != nil && err != io.EOF { 1343 end.Error = toRPCErr(err) 1344 } 1345 sh.HandleRPC(stream.Context(), end) 1346 } 1347 1348 if channelz.IsOn() { 1349 if err != nil && err != io.EOF { 1350 s.incrCallsFailed() 1351 } else { 1352 s.incrCallsSucceeded() 1353 } 1354 } 1355 }() 1356 } 1357 1358 ss.binlog = binarylog.GetMethodLogger(stream.Method()) 1359 if ss.binlog != nil { 1360 md, _ := metadata.FromIncomingContext(ctx) 1361 logEntry := &binarylog.ClientHeader{ 1362 Header: md, 1363 MethodName: stream.Method(), 1364 PeerAddr: nil, 1365 } 1366 if deadline, ok := ctx.Deadline(); ok { 1367 logEntry.Timeout = time.Until(deadline) 1368 if logEntry.Timeout < 0 { 1369 logEntry.Timeout = 0 1370 } 1371 } 1372 if a := md[":authority"]; len(a) > 0 { 1373 logEntry.Authority = a[0] 1374 } 1375 if peer, ok := peer.FromContext(ss.Context()); ok { 1376 logEntry.PeerAddr = peer.Addr 1377 } 1378 ss.binlog.Log(logEntry) 1379 } 1380 1381 // If dc is set and matches the stream's compression, use it. Otherwise, try 1382 // to find a matching registered compressor for decomp. 1383 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1384 ss.dc = s.opts.dc 1385 } else if rc != "" && rc != encoding.Identity { 1386 ss.decomp = encoding.GetCompressor(rc) 1387 if ss.decomp == nil { 1388 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1389 t.WriteStatus(ss.s, st) 1390 return st.Err() 1391 } 1392 } 1393 1394 // If cp is set, use it. Otherwise, attempt to compress the response using 1395 // the incoming message compression method. 1396 // 1397 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1398 if s.opts.cp != nil { 1399 ss.cp = s.opts.cp 1400 stream.SetSendCompress(s.opts.cp.Type()) 1401 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1402 // Legacy compressor not specified; attempt to respond with same encoding. 1403 ss.comp = encoding.GetCompressor(rc) 1404 if ss.comp != nil { 1405 stream.SetSendCompress(rc) 1406 } 1407 } 1408 1409 if trInfo != nil { 1410 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1411 } 1412 var appErr error 1413 var server interface{} 1414 if srv != nil { 1415 server = srv.server 1416 } 1417 if s.opts.streamInt == nil { 1418 appErr = sd.Handler(server, ss) 1419 } else { 1420 info := &StreamServerInfo{ 1421 FullMethod: stream.Method(), 1422 IsClientStream: sd.ClientStreams, 1423 IsServerStream: sd.ServerStreams, 1424 } 1425 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1426 } 1427 if appErr != nil { 1428 appStatus, ok := status.FromError(appErr) 1429 if !ok { 1430 appStatus = status.New(codes.Unknown, appErr.Error()) 1431 appErr = appStatus.Err() 1432 } 1433 if trInfo != nil { 1434 ss.mu.Lock() 1435 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1436 ss.trInfo.tr.SetError() 1437 ss.mu.Unlock() 1438 } 1439 t.WriteStatus(ss.s, appStatus) 1440 if ss.binlog != nil { 1441 ss.binlog.Log(&binarylog.ServerTrailer{ 1442 Trailer: ss.s.Trailer(), 1443 Err: appErr, 1444 }) 1445 } 1446 // TODO: Should we log an error from WriteStatus here and below? 1447 return appErr 1448 } 1449 if trInfo != nil { 1450 ss.mu.Lock() 1451 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1452 ss.mu.Unlock() 1453 } 1454 err = t.WriteStatus(ss.s, statusOK) 1455 if ss.binlog != nil { 1456 ss.binlog.Log(&binarylog.ServerTrailer{ 1457 Trailer: ss.s.Trailer(), 1458 Err: appErr, 1459 }) 1460 } 1461 return err 1462} 1463 1464func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1465 sm := stream.Method() 1466 if sm != "" && sm[0] == '/' { 1467 sm = sm[1:] 1468 } 1469 pos := strings.LastIndex(sm, "/") 1470 if pos == -1 { 1471 if trInfo != nil { 1472 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1473 trInfo.tr.SetError() 1474 } 1475 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1476 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1477 if trInfo != nil { 1478 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1479 trInfo.tr.SetError() 1480 } 1481 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1482 } 1483 if trInfo != nil { 1484 trInfo.tr.Finish() 1485 } 1486 return 1487 } 1488 service := sm[:pos] 1489 method := sm[pos+1:] 1490 1491 srv, knownService := s.m[service] 1492 if knownService { 1493 if md, ok := srv.md[method]; ok { 1494 s.processUnaryRPC(t, stream, srv, md, trInfo) 1495 return 1496 } 1497 if sd, ok := srv.sd[method]; ok { 1498 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1499 return 1500 } 1501 } 1502 // Unknown service, or known server unknown method. 1503 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1504 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1505 return 1506 } 1507 var errDesc string 1508 if !knownService { 1509 errDesc = fmt.Sprintf("unknown service %v", service) 1510 } else { 1511 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1512 } 1513 if trInfo != nil { 1514 trInfo.tr.LazyPrintf("%s", errDesc) 1515 trInfo.tr.SetError() 1516 } 1517 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1518 if trInfo != nil { 1519 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1520 trInfo.tr.SetError() 1521 } 1522 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1523 } 1524 if trInfo != nil { 1525 trInfo.tr.Finish() 1526 } 1527} 1528 1529// The key to save ServerTransportStream in the context. 1530type streamKey struct{} 1531 1532// NewContextWithServerTransportStream creates a new context from ctx and 1533// attaches stream to it. 1534// 1535// This API is EXPERIMENTAL. 1536func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1537 return context.WithValue(ctx, streamKey{}, stream) 1538} 1539 1540// ServerTransportStream is a minimal interface that a transport stream must 1541// implement. This can be used to mock an actual transport stream for tests of 1542// handler code that use, for example, grpc.SetHeader (which requires some 1543// stream to be in context). 1544// 1545// See also NewContextWithServerTransportStream. 1546// 1547// This API is EXPERIMENTAL. 1548type ServerTransportStream interface { 1549 Method() string 1550 SetHeader(md metadata.MD) error 1551 SendHeader(md metadata.MD) error 1552 SetTrailer(md metadata.MD) error 1553} 1554 1555// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1556// ctx. Returns nil if the given context has no stream associated with it 1557// (which implies it is not an RPC invocation context). 1558// 1559// This API is EXPERIMENTAL. 1560func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1561 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1562 return s 1563} 1564 1565// Stop stops the gRPC server. It immediately closes all open 1566// connections and listeners. 1567// It cancels all active RPCs on the server side and the corresponding 1568// pending RPCs on the client side will get notified by connection 1569// errors. 1570func (s *Server) Stop() { 1571 s.quit.Fire() 1572 1573 defer func() { 1574 s.serveWG.Wait() 1575 s.done.Fire() 1576 }() 1577 1578 s.channelzRemoveOnce.Do(func() { 1579 if channelz.IsOn() { 1580 channelz.RemoveEntry(s.channelzID) 1581 } 1582 }) 1583 1584 s.mu.Lock() 1585 listeners := s.lis 1586 s.lis = nil 1587 st := s.conns 1588 s.conns = nil 1589 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1590 s.cv.Broadcast() 1591 s.mu.Unlock() 1592 1593 for lis := range listeners { 1594 lis.Close() 1595 } 1596 for c := range st { 1597 c.Close() 1598 } 1599 if s.opts.numServerWorkers > 0 { 1600 s.stopServerWorkers() 1601 } 1602 1603 s.mu.Lock() 1604 if s.events != nil { 1605 s.events.Finish() 1606 s.events = nil 1607 } 1608 s.mu.Unlock() 1609} 1610 1611// GracefulStop stops the gRPC server gracefully. It stops the server from 1612// accepting new connections and RPCs and blocks until all the pending RPCs are 1613// finished. 1614func (s *Server) GracefulStop() { 1615 s.quit.Fire() 1616 defer s.done.Fire() 1617 1618 s.channelzRemoveOnce.Do(func() { 1619 if channelz.IsOn() { 1620 channelz.RemoveEntry(s.channelzID) 1621 } 1622 }) 1623 s.mu.Lock() 1624 if s.conns == nil { 1625 s.mu.Unlock() 1626 return 1627 } 1628 1629 for lis := range s.lis { 1630 lis.Close() 1631 } 1632 s.lis = nil 1633 if !s.drain { 1634 for st := range s.conns { 1635 st.Drain() 1636 } 1637 s.drain = true 1638 } 1639 1640 // Wait for serving threads to be ready to exit. Only then can we be sure no 1641 // new conns will be created. 1642 s.mu.Unlock() 1643 s.serveWG.Wait() 1644 s.mu.Lock() 1645 1646 for len(s.conns) != 0 { 1647 s.cv.Wait() 1648 } 1649 s.conns = nil 1650 if s.events != nil { 1651 s.events.Finish() 1652 s.events = nil 1653 } 1654 s.mu.Unlock() 1655} 1656 1657// contentSubtype must be lowercase 1658// cannot return nil 1659func (s *Server) getCodec(contentSubtype string) baseCodec { 1660 if s.opts.codec != nil { 1661 return s.opts.codec 1662 } 1663 if contentSubtype == "" { 1664 return encoding.GetCodec(proto.Name) 1665 } 1666 codec := encoding.GetCodec(contentSubtype) 1667 if codec == nil { 1668 return encoding.GetCodec(proto.Name) 1669 } 1670 return codec 1671} 1672 1673// SetHeader sets the header metadata. 1674// When called multiple times, all the provided metadata will be merged. 1675// All the metadata will be sent out when one of the following happens: 1676// - grpc.SendHeader() is called; 1677// - The first response is sent out; 1678// - An RPC status is sent out (error or success). 1679func SetHeader(ctx context.Context, md metadata.MD) error { 1680 if md.Len() == 0 { 1681 return nil 1682 } 1683 stream := ServerTransportStreamFromContext(ctx) 1684 if stream == nil { 1685 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1686 } 1687 return stream.SetHeader(md) 1688} 1689 1690// SendHeader sends header metadata. It may be called at most once. 1691// The provided md and headers set by SetHeader() will be sent. 1692func SendHeader(ctx context.Context, md metadata.MD) error { 1693 stream := ServerTransportStreamFromContext(ctx) 1694 if stream == nil { 1695 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1696 } 1697 if err := stream.SendHeader(md); err != nil { 1698 return toRPCErr(err) 1699 } 1700 return nil 1701} 1702 1703// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1704// When called more than once, all the provided metadata will be merged. 1705func SetTrailer(ctx context.Context, md metadata.MD) error { 1706 if md.Len() == 0 { 1707 return nil 1708 } 1709 stream := ServerTransportStreamFromContext(ctx) 1710 if stream == nil { 1711 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1712 } 1713 return stream.SetTrailer(md) 1714} 1715 1716// Method returns the method string for the server context. The returned 1717// string is in the format of "/service/method". 1718func Method(ctx context.Context) (string, bool) { 1719 s := ServerTransportStreamFromContext(ctx) 1720 if s == nil { 1721 return "", false 1722 } 1723 return s.Method(), true 1724} 1725 1726type channelzServer struct { 1727 s *Server 1728} 1729 1730func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 1731 return c.s.channelzMetric() 1732} 1733