1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "bytes" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "time" 34 35 "io/ioutil" 36 37 "golang.org/x/net/context" 38 "golang.org/x/net/http2" 39 "golang.org/x/net/trace" 40 41 "google.golang.org/grpc/channelz" 42 "google.golang.org/grpc/codes" 43 "google.golang.org/grpc/credentials" 44 "google.golang.org/grpc/encoding" 45 "google.golang.org/grpc/encoding/proto" 46 "google.golang.org/grpc/grpclog" 47 "google.golang.org/grpc/internal" 48 "google.golang.org/grpc/keepalive" 49 "google.golang.org/grpc/metadata" 50 "google.golang.org/grpc/stats" 51 "google.golang.org/grpc/status" 52 "google.golang.org/grpc/tap" 53 "google.golang.org/grpc/transport" 54) 55 56const ( 57 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 58 defaultServerMaxSendMessageSize = math.MaxInt32 59) 60 61type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 62 63// MethodDesc represents an RPC service's method specification. 64type MethodDesc struct { 65 MethodName string 66 Handler methodHandler 67} 68 69// ServiceDesc represents an RPC service's specification. 70type ServiceDesc struct { 71 ServiceName string 72 // The pointer to the service interface. Used to check whether the user 73 // provided implementation satisfies the interface requirements. 74 HandlerType interface{} 75 Methods []MethodDesc 76 Streams []StreamDesc 77 Metadata interface{} 78} 79 80// service consists of the information of the server serving this service and 81// the methods in this service. 82type service struct { 83 server interface{} // the server for service methods 84 md map[string]*MethodDesc 85 sd map[string]*StreamDesc 86 mdata interface{} 87} 88 89// Server is a gRPC server to serve RPC requests. 90type Server struct { 91 opts options 92 93 mu sync.Mutex // guards following 94 lis map[net.Listener]bool 95 conns map[io.Closer]bool 96 serve bool 97 drain bool 98 cv *sync.Cond // signaled when connections close for GracefulStop 99 m map[string]*service // service name -> service info 100 events trace.EventLog 101 102 quit chan struct{} 103 done chan struct{} 104 quitOnce sync.Once 105 doneOnce sync.Once 106 channelzRemoveOnce sync.Once 107 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 108 109 channelzID int64 // channelz unique identification number 110 czmu sync.RWMutex 111 callsStarted int64 112 callsFailed int64 113 callsSucceeded int64 114 lastCallStartedTime time.Time 115} 116 117type options struct { 118 creds credentials.TransportCredentials 119 codec baseCodec 120 cp Compressor 121 dc Decompressor 122 unaryInt UnaryServerInterceptor 123 streamInt StreamServerInterceptor 124 inTapHandle tap.ServerInHandle 125 statsHandler stats.Handler 126 maxConcurrentStreams uint32 127 maxReceiveMessageSize int 128 maxSendMessageSize int 129 useHandlerImpl bool // use http.Handler-based server 130 unknownStreamDesc *StreamDesc 131 keepaliveParams keepalive.ServerParameters 132 keepalivePolicy keepalive.EnforcementPolicy 133 initialWindowSize int32 134 initialConnWindowSize int32 135 writeBufferSize int 136 readBufferSize int 137 connectionTimeout time.Duration 138} 139 140var defaultServerOptions = options{ 141 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 142 maxSendMessageSize: defaultServerMaxSendMessageSize, 143 connectionTimeout: 120 * time.Second, 144} 145 146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 147type ServerOption func(*options) 148 149// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched 150// before doing a write on the wire. 151func WriteBufferSize(s int) ServerOption { 152 return func(o *options) { 153 o.writeBufferSize = s 154 } 155} 156 157// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 158// for one read syscall. 159func ReadBufferSize(s int) ServerOption { 160 return func(o *options) { 161 o.readBufferSize = s 162 } 163} 164 165// InitialWindowSize returns a ServerOption that sets window size for stream. 166// The lower bound for window size is 64K and any value smaller than that will be ignored. 167func InitialWindowSize(s int32) ServerOption { 168 return func(o *options) { 169 o.initialWindowSize = s 170 } 171} 172 173// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 174// The lower bound for window size is 64K and any value smaller than that will be ignored. 175func InitialConnWindowSize(s int32) ServerOption { 176 return func(o *options) { 177 o.initialConnWindowSize = s 178 } 179} 180 181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 183 return func(o *options) { 184 o.keepaliveParams = kp 185 } 186} 187 188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 190 return func(o *options) { 191 o.keepalivePolicy = kep 192 } 193} 194 195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 196// 197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 198func CustomCodec(codec Codec) ServerOption { 199 return func(o *options) { 200 o.codec = codec 201 } 202} 203 204// RPCCompressor returns a ServerOption that sets a compressor for outbound 205// messages. For backward compatibility, all outbound messages will be sent 206// using this compressor, regardless of incoming message compression. By 207// default, server messages will be sent using the same compressor with which 208// request messages were sent. 209// 210// Deprecated: use encoding.RegisterCompressor instead. 211func RPCCompressor(cp Compressor) ServerOption { 212 return func(o *options) { 213 o.cp = cp 214 } 215} 216 217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 218// messages. It has higher priority than decompressors registered via 219// encoding.RegisterCompressor. 220// 221// Deprecated: use encoding.RegisterCompressor instead. 222func RPCDecompressor(dc Decompressor) ServerOption { 223 return func(o *options) { 224 o.dc = dc 225 } 226} 227 228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 229// If this is not set, gRPC uses the default limit. 230// 231// Deprecated: use MaxRecvMsgSize instead. 232func MaxMsgSize(m int) ServerOption { 233 return MaxRecvMsgSize(m) 234} 235 236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 237// If this is not set, gRPC uses the default 4MB. 238func MaxRecvMsgSize(m int) ServerOption { 239 return func(o *options) { 240 o.maxReceiveMessageSize = m 241 } 242} 243 244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 245// If this is not set, gRPC uses the default 4MB. 246func MaxSendMsgSize(m int) ServerOption { 247 return func(o *options) { 248 o.maxSendMessageSize = m 249 } 250} 251 252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 253// of concurrent streams to each ServerTransport. 254func MaxConcurrentStreams(n uint32) ServerOption { 255 return func(o *options) { 256 o.maxConcurrentStreams = n 257 } 258} 259 260// Creds returns a ServerOption that sets credentials for server connections. 261func Creds(c credentials.TransportCredentials) ServerOption { 262 return func(o *options) { 263 o.creds = c 264 } 265} 266 267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 268// server. Only one unary interceptor can be installed. The construction of multiple 269// interceptors (e.g., chaining) can be implemented at the caller. 270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 271 return func(o *options) { 272 if o.unaryInt != nil { 273 panic("The unary server interceptor was already set and may not be reset.") 274 } 275 o.unaryInt = i 276 } 277} 278 279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 280// server. Only one stream interceptor can be installed. 281func StreamInterceptor(i StreamServerInterceptor) ServerOption { 282 return func(o *options) { 283 if o.streamInt != nil { 284 panic("The stream server interceptor was already set and may not be reset.") 285 } 286 o.streamInt = i 287 } 288} 289 290// InTapHandle returns a ServerOption that sets the tap handle for all the server 291// transport to be created. Only one can be installed. 292func InTapHandle(h tap.ServerInHandle) ServerOption { 293 return func(o *options) { 294 if o.inTapHandle != nil { 295 panic("The tap handle was already set and may not be reset.") 296 } 297 o.inTapHandle = h 298 } 299} 300 301// StatsHandler returns a ServerOption that sets the stats handler for the server. 302func StatsHandler(h stats.Handler) ServerOption { 303 return func(o *options) { 304 o.statsHandler = h 305 } 306} 307 308// UnknownServiceHandler returns a ServerOption that allows for adding a custom 309// unknown service handler. The provided method is a bidi-streaming RPC service 310// handler that will be invoked instead of returning the "unimplemented" gRPC 311// error whenever a request is received for an unregistered service or method. 312// The handling function has full access to the Context of the request and the 313// stream, and the invocation bypasses interceptors. 314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 315 return func(o *options) { 316 o.unknownStreamDesc = &StreamDesc{ 317 StreamName: "unknown_service_handler", 318 Handler: streamHandler, 319 // We need to assume that the users of the streamHandler will want to use both. 320 ClientStreams: true, 321 ServerStreams: true, 322 } 323 } 324} 325 326// ConnectionTimeout returns a ServerOption that sets the timeout for 327// connection establishment (up to and including HTTP/2 handshaking) for all 328// new connections. If this is not set, the default is 120 seconds. A zero or 329// negative value will result in an immediate timeout. 330// 331// This API is EXPERIMENTAL. 332func ConnectionTimeout(d time.Duration) ServerOption { 333 return func(o *options) { 334 o.connectionTimeout = d 335 } 336} 337 338// NewServer creates a gRPC server which has no service registered and has not 339// started to accept requests yet. 340func NewServer(opt ...ServerOption) *Server { 341 opts := defaultServerOptions 342 for _, o := range opt { 343 o(&opts) 344 } 345 s := &Server{ 346 lis: make(map[net.Listener]bool), 347 opts: opts, 348 conns: make(map[io.Closer]bool), 349 m: make(map[string]*service), 350 quit: make(chan struct{}), 351 done: make(chan struct{}), 352 } 353 s.cv = sync.NewCond(&s.mu) 354 if EnableTracing { 355 _, file, line, _ := runtime.Caller(1) 356 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 357 } 358 359 if channelz.IsOn() { 360 s.channelzID = channelz.RegisterServer(s, "") 361 } 362 return s 363} 364 365// printf records an event in s's event log, unless s has been stopped. 366// REQUIRES s.mu is held. 367func (s *Server) printf(format string, a ...interface{}) { 368 if s.events != nil { 369 s.events.Printf(format, a...) 370 } 371} 372 373// errorf records an error in s's event log, unless s has been stopped. 374// REQUIRES s.mu is held. 375func (s *Server) errorf(format string, a ...interface{}) { 376 if s.events != nil { 377 s.events.Errorf(format, a...) 378 } 379} 380 381// RegisterService registers a service and its implementation to the gRPC 382// server. It is called from the IDL generated code. This must be called before 383// invoking Serve. 384func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 385 ht := reflect.TypeOf(sd.HandlerType).Elem() 386 st := reflect.TypeOf(ss) 387 if !st.Implements(ht) { 388 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 389 } 390 s.register(sd, ss) 391} 392 393func (s *Server) register(sd *ServiceDesc, ss interface{}) { 394 s.mu.Lock() 395 defer s.mu.Unlock() 396 s.printf("RegisterService(%q)", sd.ServiceName) 397 if s.serve { 398 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 399 } 400 if _, ok := s.m[sd.ServiceName]; ok { 401 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 402 } 403 srv := &service{ 404 server: ss, 405 md: make(map[string]*MethodDesc), 406 sd: make(map[string]*StreamDesc), 407 mdata: sd.Metadata, 408 } 409 for i := range sd.Methods { 410 d := &sd.Methods[i] 411 srv.md[d.MethodName] = d 412 } 413 for i := range sd.Streams { 414 d := &sd.Streams[i] 415 srv.sd[d.StreamName] = d 416 } 417 s.m[sd.ServiceName] = srv 418} 419 420// MethodInfo contains the information of an RPC including its method name and type. 421type MethodInfo struct { 422 // Name is the method name only, without the service name or package name. 423 Name string 424 // IsClientStream indicates whether the RPC is a client streaming RPC. 425 IsClientStream bool 426 // IsServerStream indicates whether the RPC is a server streaming RPC. 427 IsServerStream bool 428} 429 430// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 431type ServiceInfo struct { 432 Methods []MethodInfo 433 // Metadata is the metadata specified in ServiceDesc when registering service. 434 Metadata interface{} 435} 436 437// GetServiceInfo returns a map from service names to ServiceInfo. 438// Service names include the package names, in the form of <package>.<service>. 439func (s *Server) GetServiceInfo() map[string]ServiceInfo { 440 ret := make(map[string]ServiceInfo) 441 for n, srv := range s.m { 442 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 443 for m := range srv.md { 444 methods = append(methods, MethodInfo{ 445 Name: m, 446 IsClientStream: false, 447 IsServerStream: false, 448 }) 449 } 450 for m, d := range srv.sd { 451 methods = append(methods, MethodInfo{ 452 Name: m, 453 IsClientStream: d.ClientStreams, 454 IsServerStream: d.ServerStreams, 455 }) 456 } 457 458 ret[n] = ServiceInfo{ 459 Methods: methods, 460 Metadata: srv.mdata, 461 } 462 } 463 return ret 464} 465 466// ErrServerStopped indicates that the operation is now illegal because of 467// the server being stopped. 468var ErrServerStopped = errors.New("grpc: the server has been stopped") 469 470func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 471 if s.opts.creds == nil { 472 return rawConn, nil, nil 473 } 474 return s.opts.creds.ServerHandshake(rawConn) 475} 476 477type listenSocket struct { 478 net.Listener 479 channelzID int64 480} 481 482func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 483 return &channelz.SocketInternalMetric{ 484 LocalAddr: l.Listener.Addr(), 485 } 486} 487 488func (l *listenSocket) Close() error { 489 err := l.Listener.Close() 490 if channelz.IsOn() { 491 channelz.RemoveEntry(l.channelzID) 492 } 493 return err 494} 495 496// Serve accepts incoming connections on the listener lis, creating a new 497// ServerTransport and service goroutine for each. The service goroutines 498// read gRPC requests and then call the registered handlers to reply to them. 499// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 500// this method returns. 501// Serve will return a non-nil error unless Stop or GracefulStop is called. 502func (s *Server) Serve(lis net.Listener) error { 503 s.mu.Lock() 504 s.printf("serving") 505 s.serve = true 506 if s.lis == nil { 507 // Serve called after Stop or GracefulStop. 508 s.mu.Unlock() 509 lis.Close() 510 return ErrServerStopped 511 } 512 513 s.serveWG.Add(1) 514 defer func() { 515 s.serveWG.Done() 516 select { 517 // Stop or GracefulStop called; block until done and return nil. 518 case <-s.quit: 519 <-s.done 520 default: 521 } 522 }() 523 524 ls := &listenSocket{Listener: lis} 525 s.lis[ls] = true 526 527 if channelz.IsOn() { 528 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "") 529 } 530 s.mu.Unlock() 531 532 defer func() { 533 s.mu.Lock() 534 if s.lis != nil && s.lis[ls] { 535 ls.Close() 536 delete(s.lis, ls) 537 } 538 s.mu.Unlock() 539 }() 540 541 var tempDelay time.Duration // how long to sleep on accept failure 542 543 for { 544 rawConn, err := lis.Accept() 545 if err != nil { 546 if ne, ok := err.(interface { 547 Temporary() bool 548 }); ok && ne.Temporary() { 549 if tempDelay == 0 { 550 tempDelay = 5 * time.Millisecond 551 } else { 552 tempDelay *= 2 553 } 554 if max := 1 * time.Second; tempDelay > max { 555 tempDelay = max 556 } 557 s.mu.Lock() 558 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 559 s.mu.Unlock() 560 timer := time.NewTimer(tempDelay) 561 select { 562 case <-timer.C: 563 case <-s.quit: 564 timer.Stop() 565 return nil 566 } 567 continue 568 } 569 s.mu.Lock() 570 s.printf("done serving; Accept = %v", err) 571 s.mu.Unlock() 572 573 select { 574 case <-s.quit: 575 return nil 576 default: 577 } 578 return err 579 } 580 tempDelay = 0 581 // Start a new goroutine to deal with rawConn so we don't stall this Accept 582 // loop goroutine. 583 // 584 // Make sure we account for the goroutine so GracefulStop doesn't nil out 585 // s.conns before this conn can be added. 586 s.serveWG.Add(1) 587 go func() { 588 s.handleRawConn(rawConn) 589 s.serveWG.Done() 590 }() 591 } 592} 593 594// handleRawConn forks a goroutine to handle a just-accepted connection that 595// has not had any I/O performed on it yet. 596func (s *Server) handleRawConn(rawConn net.Conn) { 597 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 598 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 599 if err != nil { 600 s.mu.Lock() 601 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 602 s.mu.Unlock() 603 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 604 // If serverHandshake returns ErrConnDispatched, keep rawConn open. 605 if err != credentials.ErrConnDispatched { 606 rawConn.Close() 607 } 608 rawConn.SetDeadline(time.Time{}) 609 return 610 } 611 612 s.mu.Lock() 613 if s.conns == nil { 614 s.mu.Unlock() 615 conn.Close() 616 return 617 } 618 s.mu.Unlock() 619 620 var serve func() 621 c := conn.(io.Closer) 622 if s.opts.useHandlerImpl { 623 serve = func() { s.serveUsingHandler(conn) } 624 } else { 625 // Finish handshaking (HTTP2) 626 st := s.newHTTP2Transport(conn, authInfo) 627 if st == nil { 628 return 629 } 630 c = st 631 serve = func() { s.serveStreams(st) } 632 } 633 634 rawConn.SetDeadline(time.Time{}) 635 if !s.addConn(c) { 636 return 637 } 638 go func() { 639 serve() 640 s.removeConn(c) 641 }() 642} 643 644// newHTTP2Transport sets up a http/2 transport (using the 645// gRPC http2 server transport in transport/http2_server.go). 646func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 647 config := &transport.ServerConfig{ 648 MaxStreams: s.opts.maxConcurrentStreams, 649 AuthInfo: authInfo, 650 InTapHandle: s.opts.inTapHandle, 651 StatsHandler: s.opts.statsHandler, 652 KeepaliveParams: s.opts.keepaliveParams, 653 KeepalivePolicy: s.opts.keepalivePolicy, 654 InitialWindowSize: s.opts.initialWindowSize, 655 InitialConnWindowSize: s.opts.initialConnWindowSize, 656 WriteBufferSize: s.opts.writeBufferSize, 657 ReadBufferSize: s.opts.readBufferSize, 658 ChannelzParentID: s.channelzID, 659 } 660 st, err := transport.NewServerTransport("http2", c, config) 661 if err != nil { 662 s.mu.Lock() 663 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 664 s.mu.Unlock() 665 c.Close() 666 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) 667 return nil 668 } 669 670 return st 671} 672 673func (s *Server) serveStreams(st transport.ServerTransport) { 674 defer st.Close() 675 var wg sync.WaitGroup 676 st.HandleStreams(func(stream *transport.Stream) { 677 wg.Add(1) 678 go func() { 679 defer wg.Done() 680 s.handleStream(st, stream, s.traceInfo(st, stream)) 681 }() 682 }, func(ctx context.Context, method string) context.Context { 683 if !EnableTracing { 684 return ctx 685 } 686 tr := trace.New("grpc.Recv."+methodFamily(method), method) 687 return trace.NewContext(ctx, tr) 688 }) 689 wg.Wait() 690} 691 692var _ http.Handler = (*Server)(nil) 693 694// serveUsingHandler is called from handleRawConn when s is configured 695// to handle requests via the http.Handler interface. It sets up a 696// net/http.Server to handle the just-accepted conn. The http.Server 697// is configured to route all incoming requests (all HTTP/2 streams) 698// to ServeHTTP, which creates a new ServerTransport for each stream. 699// serveUsingHandler blocks until conn closes. 700// 701// This codepath is only used when Server.TestingUseHandlerImpl has 702// been configured. This lets the end2end tests exercise the ServeHTTP 703// method as one of the environment types. 704// 705// conn is the *tls.Conn that's already been authenticated. 706func (s *Server) serveUsingHandler(conn net.Conn) { 707 h2s := &http2.Server{ 708 MaxConcurrentStreams: s.opts.maxConcurrentStreams, 709 } 710 h2s.ServeConn(conn, &http2.ServeConnOpts{ 711 Handler: s, 712 }) 713} 714 715// ServeHTTP implements the Go standard library's http.Handler 716// interface by responding to the gRPC request r, by looking up 717// the requested gRPC method in the gRPC server s. 718// 719// The provided HTTP request must have arrived on an HTTP/2 720// connection. When using the Go standard library's server, 721// practically this means that the Request must also have arrived 722// over TLS. 723// 724// To share one port (such as 443 for https) between gRPC and an 725// existing http.Handler, use a root http.Handler such as: 726// 727// if r.ProtoMajor == 2 && strings.HasPrefix( 728// r.Header.Get("Content-Type"), "application/grpc") { 729// grpcServer.ServeHTTP(w, r) 730// } else { 731// yourMux.ServeHTTP(w, r) 732// } 733// 734// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 735// separate from grpc-go's HTTP/2 server. Performance and features may vary 736// between the two paths. ServeHTTP does not support some gRPC features 737// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 738// and subject to change. 739func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 740 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 741 if err != nil { 742 http.Error(w, err.Error(), http.StatusInternalServerError) 743 return 744 } 745 if !s.addConn(st) { 746 return 747 } 748 defer s.removeConn(st) 749 s.serveStreams(st) 750} 751 752// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 753// If tracing is not enabled, it returns nil. 754func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 755 tr, ok := trace.FromContext(stream.Context()) 756 if !ok { 757 return nil 758 } 759 760 trInfo = &traceInfo{ 761 tr: tr, 762 } 763 trInfo.firstLine.client = false 764 trInfo.firstLine.remoteAddr = st.RemoteAddr() 765 766 if dl, ok := stream.Context().Deadline(); ok { 767 trInfo.firstLine.deadline = dl.Sub(time.Now()) 768 } 769 return trInfo 770} 771 772func (s *Server) addConn(c io.Closer) bool { 773 s.mu.Lock() 774 defer s.mu.Unlock() 775 if s.conns == nil { 776 c.Close() 777 return false 778 } 779 if s.drain { 780 // Transport added after we drained our existing conns: drain it 781 // immediately. 782 c.(transport.ServerTransport).Drain() 783 } 784 s.conns[c] = true 785 return true 786} 787 788func (s *Server) removeConn(c io.Closer) { 789 s.mu.Lock() 790 defer s.mu.Unlock() 791 if s.conns != nil { 792 delete(s.conns, c) 793 s.cv.Broadcast() 794 } 795} 796 797// ChannelzMetric returns ServerInternalMetric of current server. 798// This is an EXPERIMENTAL API. 799func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric { 800 s.czmu.RLock() 801 defer s.czmu.RUnlock() 802 return &channelz.ServerInternalMetric{ 803 CallsStarted: s.callsStarted, 804 CallsSucceeded: s.callsSucceeded, 805 CallsFailed: s.callsFailed, 806 LastCallStartedTimestamp: s.lastCallStartedTime, 807 } 808} 809 810func (s *Server) incrCallsStarted() { 811 s.czmu.Lock() 812 s.callsStarted++ 813 s.lastCallStartedTime = time.Now() 814 s.czmu.Unlock() 815} 816 817func (s *Server) incrCallsSucceeded() { 818 s.czmu.Lock() 819 s.callsSucceeded++ 820 s.czmu.Unlock() 821} 822 823func (s *Server) incrCallsFailed() { 824 s.czmu.Lock() 825 s.callsFailed++ 826 s.czmu.Unlock() 827} 828 829func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 830 var ( 831 outPayload *stats.OutPayload 832 ) 833 if s.opts.statsHandler != nil { 834 outPayload = &stats.OutPayload{} 835 } 836 hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp) 837 if err != nil { 838 grpclog.Errorln("grpc: server failed to encode response: ", err) 839 return err 840 } 841 if len(data) > s.opts.maxSendMessageSize { 842 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize) 843 } 844 err = t.Write(stream, hdr, data, opts) 845 if err == nil && outPayload != nil { 846 outPayload.SentTime = time.Now() 847 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload) 848 } 849 return err 850} 851 852func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 853 if channelz.IsOn() { 854 s.incrCallsStarted() 855 defer func() { 856 if err != nil && err != io.EOF { 857 s.incrCallsFailed() 858 } else { 859 s.incrCallsSucceeded() 860 } 861 }() 862 } 863 sh := s.opts.statsHandler 864 if sh != nil { 865 beginTime := time.Now() 866 begin := &stats.Begin{ 867 BeginTime: beginTime, 868 } 869 sh.HandleRPC(stream.Context(), begin) 870 defer func() { 871 end := &stats.End{ 872 BeginTime: beginTime, 873 EndTime: time.Now(), 874 } 875 if err != nil && err != io.EOF { 876 end.Error = toRPCErr(err) 877 } 878 sh.HandleRPC(stream.Context(), end) 879 }() 880 } 881 if trInfo != nil { 882 defer trInfo.tr.Finish() 883 trInfo.firstLine.client = false 884 trInfo.tr.LazyLog(&trInfo.firstLine, false) 885 defer func() { 886 if err != nil && err != io.EOF { 887 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 888 trInfo.tr.SetError() 889 } 890 }() 891 } 892 893 // comp and cp are used for compression. decomp and dc are used for 894 // decompression. If comp and decomp are both set, they are the same; 895 // however they are kept separate to ensure that at most one of the 896 // compressor/decompressor variable pairs are set for use later. 897 var comp, decomp encoding.Compressor 898 var cp Compressor 899 var dc Decompressor 900 901 // If dc is set and matches the stream's compression, use it. Otherwise, try 902 // to find a matching registered compressor for decomp. 903 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 904 dc = s.opts.dc 905 } else if rc != "" && rc != encoding.Identity { 906 decomp = encoding.GetCompressor(rc) 907 if decomp == nil { 908 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 909 t.WriteStatus(stream, st) 910 return st.Err() 911 } 912 } 913 914 // If cp is set, use it. Otherwise, attempt to compress the response using 915 // the incoming message compression method. 916 // 917 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 918 if s.opts.cp != nil { 919 cp = s.opts.cp 920 stream.SetSendCompress(cp.Type()) 921 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 922 // Legacy compressor not specified; attempt to respond with same encoding. 923 comp = encoding.GetCompressor(rc) 924 if comp != nil { 925 stream.SetSendCompress(rc) 926 } 927 } 928 929 p := &parser{r: stream} 930 pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) 931 if err == io.EOF { 932 // The entire stream is done (for unary RPC only). 933 return err 934 } 935 if err == io.ErrUnexpectedEOF { 936 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 937 } 938 if err != nil { 939 if st, ok := status.FromError(err); ok { 940 if e := t.WriteStatus(stream, st); e != nil { 941 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 942 } 943 } else { 944 switch st := err.(type) { 945 case transport.ConnectionError: 946 // Nothing to do here. 947 case transport.StreamError: 948 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { 949 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 950 } 951 default: 952 panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) 953 } 954 } 955 return err 956 } 957 if channelz.IsOn() { 958 t.IncrMsgRecv() 959 } 960 if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil { 961 if e := t.WriteStatus(stream, st); e != nil { 962 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 963 } 964 return st.Err() 965 } 966 var inPayload *stats.InPayload 967 if sh != nil { 968 inPayload = &stats.InPayload{ 969 RecvTime: time.Now(), 970 } 971 } 972 df := func(v interface{}) error { 973 if inPayload != nil { 974 inPayload.WireLength = len(req) 975 } 976 if pf == compressionMade { 977 var err error 978 if dc != nil { 979 req, err = dc.Do(bytes.NewReader(req)) 980 if err != nil { 981 return status.Errorf(codes.Internal, err.Error()) 982 } 983 } else { 984 tmp, _ := decomp.Decompress(bytes.NewReader(req)) 985 req, err = ioutil.ReadAll(tmp) 986 if err != nil { 987 return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) 988 } 989 } 990 } 991 if len(req) > s.opts.maxReceiveMessageSize { 992 // TODO: Revisit the error code. Currently keep it consistent with 993 // java implementation. 994 return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize) 995 } 996 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil { 997 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 998 } 999 if inPayload != nil { 1000 inPayload.Payload = v 1001 inPayload.Data = req 1002 inPayload.Length = len(req) 1003 sh.HandleRPC(stream.Context(), inPayload) 1004 } 1005 if trInfo != nil { 1006 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1007 } 1008 return nil 1009 } 1010 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1011 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 1012 if appErr != nil { 1013 appStatus, ok := status.FromError(appErr) 1014 if !ok { 1015 // Convert appErr if it is not a grpc status error. 1016 appErr = status.Error(codes.Unknown, appErr.Error()) 1017 appStatus, _ = status.FromError(appErr) 1018 } 1019 if trInfo != nil { 1020 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1021 trInfo.tr.SetError() 1022 } 1023 if e := t.WriteStatus(stream, appStatus); e != nil { 1024 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1025 } 1026 return appErr 1027 } 1028 if trInfo != nil { 1029 trInfo.tr.LazyLog(stringer("OK"), false) 1030 } 1031 opts := &transport.Options{ 1032 Last: true, 1033 Delay: false, 1034 } 1035 1036 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1037 if err == io.EOF { 1038 // The entire stream is done (for unary RPC only). 1039 return err 1040 } 1041 if s, ok := status.FromError(err); ok { 1042 if e := t.WriteStatus(stream, s); e != nil { 1043 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1044 } 1045 } else { 1046 switch st := err.(type) { 1047 case transport.ConnectionError: 1048 // Nothing to do here. 1049 case transport.StreamError: 1050 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { 1051 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 1052 } 1053 default: 1054 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1055 } 1056 } 1057 return err 1058 } 1059 if channelz.IsOn() { 1060 t.IncrMsgSent() 1061 } 1062 if trInfo != nil { 1063 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1064 } 1065 // TODO: Should we be logging if writing status failed here, like above? 1066 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1067 // error or allow the stats handler to see it? 1068 return t.WriteStatus(stream, status.New(codes.OK, "")) 1069} 1070 1071func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1072 if channelz.IsOn() { 1073 s.incrCallsStarted() 1074 defer func() { 1075 if err != nil && err != io.EOF { 1076 s.incrCallsFailed() 1077 } else { 1078 s.incrCallsSucceeded() 1079 } 1080 }() 1081 } 1082 sh := s.opts.statsHandler 1083 if sh != nil { 1084 beginTime := time.Now() 1085 begin := &stats.Begin{ 1086 BeginTime: beginTime, 1087 } 1088 sh.HandleRPC(stream.Context(), begin) 1089 defer func() { 1090 end := &stats.End{ 1091 BeginTime: beginTime, 1092 EndTime: time.Now(), 1093 } 1094 if err != nil && err != io.EOF { 1095 end.Error = toRPCErr(err) 1096 } 1097 sh.HandleRPC(stream.Context(), end) 1098 }() 1099 } 1100 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1101 ss := &serverStream{ 1102 ctx: ctx, 1103 t: t, 1104 s: stream, 1105 p: &parser{r: stream}, 1106 codec: s.getCodec(stream.ContentSubtype()), 1107 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1108 maxSendMessageSize: s.opts.maxSendMessageSize, 1109 trInfo: trInfo, 1110 statsHandler: sh, 1111 } 1112 1113 // If dc is set and matches the stream's compression, use it. Otherwise, try 1114 // to find a matching registered compressor for decomp. 1115 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1116 ss.dc = s.opts.dc 1117 } else if rc != "" && rc != encoding.Identity { 1118 ss.decomp = encoding.GetCompressor(rc) 1119 if ss.decomp == nil { 1120 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1121 t.WriteStatus(ss.s, st) 1122 return st.Err() 1123 } 1124 } 1125 1126 // If cp is set, use it. Otherwise, attempt to compress the response using 1127 // the incoming message compression method. 1128 // 1129 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1130 if s.opts.cp != nil { 1131 ss.cp = s.opts.cp 1132 stream.SetSendCompress(s.opts.cp.Type()) 1133 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1134 // Legacy compressor not specified; attempt to respond with same encoding. 1135 ss.comp = encoding.GetCompressor(rc) 1136 if ss.comp != nil { 1137 stream.SetSendCompress(rc) 1138 } 1139 } 1140 1141 if trInfo != nil { 1142 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1143 defer func() { 1144 ss.mu.Lock() 1145 if err != nil && err != io.EOF { 1146 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1147 ss.trInfo.tr.SetError() 1148 } 1149 ss.trInfo.tr.Finish() 1150 ss.trInfo.tr = nil 1151 ss.mu.Unlock() 1152 }() 1153 } 1154 var appErr error 1155 var server interface{} 1156 if srv != nil { 1157 server = srv.server 1158 } 1159 if s.opts.streamInt == nil { 1160 appErr = sd.Handler(server, ss) 1161 } else { 1162 info := &StreamServerInfo{ 1163 FullMethod: stream.Method(), 1164 IsClientStream: sd.ClientStreams, 1165 IsServerStream: sd.ServerStreams, 1166 } 1167 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1168 } 1169 if appErr != nil { 1170 appStatus, ok := status.FromError(appErr) 1171 if !ok { 1172 switch err := appErr.(type) { 1173 case transport.StreamError: 1174 appStatus = status.New(err.Code, err.Desc) 1175 default: 1176 appStatus = status.New(codes.Unknown, appErr.Error()) 1177 } 1178 appErr = appStatus.Err() 1179 } 1180 if trInfo != nil { 1181 ss.mu.Lock() 1182 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1183 ss.trInfo.tr.SetError() 1184 ss.mu.Unlock() 1185 } 1186 t.WriteStatus(ss.s, appStatus) 1187 // TODO: Should we log an error from WriteStatus here and below? 1188 return appErr 1189 } 1190 if trInfo != nil { 1191 ss.mu.Lock() 1192 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1193 ss.mu.Unlock() 1194 } 1195 return t.WriteStatus(ss.s, status.New(codes.OK, "")) 1196} 1197 1198func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1199 sm := stream.Method() 1200 if sm != "" && sm[0] == '/' { 1201 sm = sm[1:] 1202 } 1203 pos := strings.LastIndex(sm, "/") 1204 if pos == -1 { 1205 if trInfo != nil { 1206 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1207 trInfo.tr.SetError() 1208 } 1209 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1210 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1211 if trInfo != nil { 1212 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1213 trInfo.tr.SetError() 1214 } 1215 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1216 } 1217 if trInfo != nil { 1218 trInfo.tr.Finish() 1219 } 1220 return 1221 } 1222 service := sm[:pos] 1223 method := sm[pos+1:] 1224 srv, ok := s.m[service] 1225 if !ok { 1226 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1227 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1228 return 1229 } 1230 if trInfo != nil { 1231 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) 1232 trInfo.tr.SetError() 1233 } 1234 errDesc := fmt.Sprintf("unknown service %v", service) 1235 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1236 if trInfo != nil { 1237 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1238 trInfo.tr.SetError() 1239 } 1240 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1241 } 1242 if trInfo != nil { 1243 trInfo.tr.Finish() 1244 } 1245 return 1246 } 1247 // Unary RPC or Streaming RPC? 1248 if md, ok := srv.md[method]; ok { 1249 s.processUnaryRPC(t, stream, srv, md, trInfo) 1250 return 1251 } 1252 if sd, ok := srv.sd[method]; ok { 1253 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1254 return 1255 } 1256 if trInfo != nil { 1257 trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) 1258 trInfo.tr.SetError() 1259 } 1260 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1261 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1262 return 1263 } 1264 errDesc := fmt.Sprintf("unknown method %v", method) 1265 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1266 if trInfo != nil { 1267 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1268 trInfo.tr.SetError() 1269 } 1270 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1271 } 1272 if trInfo != nil { 1273 trInfo.tr.Finish() 1274 } 1275} 1276 1277// The key to save ServerTransportStream in the context. 1278type streamKey struct{} 1279 1280// NewContextWithServerTransportStream creates a new context from ctx and 1281// attaches stream to it. 1282// 1283// This API is EXPERIMENTAL. 1284func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1285 return context.WithValue(ctx, streamKey{}, stream) 1286} 1287 1288// ServerTransportStream is a minimal interface that a transport stream must 1289// implement. This can be used to mock an actual transport stream for tests of 1290// handler code that use, for example, grpc.SetHeader (which requires some 1291// stream to be in context). 1292// 1293// See also NewContextWithServerTransportStream. 1294// 1295// This API is EXPERIMENTAL. 1296type ServerTransportStream interface { 1297 Method() string 1298 SetHeader(md metadata.MD) error 1299 SendHeader(md metadata.MD) error 1300 SetTrailer(md metadata.MD) error 1301} 1302 1303// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1304// ctx. Returns nil if the given context has no stream associated with it 1305// (which implies it is not an RPC invocation context). 1306// 1307// This API is EXPERIMENTAL. 1308func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1309 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1310 return s 1311} 1312 1313// Stop stops the gRPC server. It immediately closes all open 1314// connections and listeners. 1315// It cancels all active RPCs on the server side and the corresponding 1316// pending RPCs on the client side will get notified by connection 1317// errors. 1318func (s *Server) Stop() { 1319 s.quitOnce.Do(func() { 1320 close(s.quit) 1321 }) 1322 1323 defer func() { 1324 s.serveWG.Wait() 1325 s.doneOnce.Do(func() { 1326 close(s.done) 1327 }) 1328 }() 1329 1330 s.channelzRemoveOnce.Do(func() { 1331 if channelz.IsOn() { 1332 channelz.RemoveEntry(s.channelzID) 1333 } 1334 }) 1335 1336 s.mu.Lock() 1337 listeners := s.lis 1338 s.lis = nil 1339 st := s.conns 1340 s.conns = nil 1341 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1342 s.cv.Broadcast() 1343 s.mu.Unlock() 1344 1345 for lis := range listeners { 1346 lis.Close() 1347 } 1348 for c := range st { 1349 c.Close() 1350 } 1351 1352 s.mu.Lock() 1353 if s.events != nil { 1354 s.events.Finish() 1355 s.events = nil 1356 } 1357 s.mu.Unlock() 1358} 1359 1360// GracefulStop stops the gRPC server gracefully. It stops the server from 1361// accepting new connections and RPCs and blocks until all the pending RPCs are 1362// finished. 1363func (s *Server) GracefulStop() { 1364 s.quitOnce.Do(func() { 1365 close(s.quit) 1366 }) 1367 1368 defer func() { 1369 s.doneOnce.Do(func() { 1370 close(s.done) 1371 }) 1372 }() 1373 1374 s.channelzRemoveOnce.Do(func() { 1375 if channelz.IsOn() { 1376 channelz.RemoveEntry(s.channelzID) 1377 } 1378 }) 1379 s.mu.Lock() 1380 if s.conns == nil { 1381 s.mu.Unlock() 1382 return 1383 } 1384 1385 for lis := range s.lis { 1386 lis.Close() 1387 } 1388 s.lis = nil 1389 if !s.drain { 1390 for c := range s.conns { 1391 c.(transport.ServerTransport).Drain() 1392 } 1393 s.drain = true 1394 } 1395 1396 // Wait for serving threads to be ready to exit. Only then can we be sure no 1397 // new conns will be created. 1398 s.mu.Unlock() 1399 s.serveWG.Wait() 1400 s.mu.Lock() 1401 1402 for len(s.conns) != 0 { 1403 s.cv.Wait() 1404 } 1405 s.conns = nil 1406 if s.events != nil { 1407 s.events.Finish() 1408 s.events = nil 1409 } 1410 s.mu.Unlock() 1411} 1412 1413func init() { 1414 internal.TestingUseHandlerImpl = func(arg interface{}) { 1415 arg.(*Server).opts.useHandlerImpl = true 1416 } 1417} 1418 1419// contentSubtype must be lowercase 1420// cannot return nil 1421func (s *Server) getCodec(contentSubtype string) baseCodec { 1422 if s.opts.codec != nil { 1423 return s.opts.codec 1424 } 1425 if contentSubtype == "" { 1426 return encoding.GetCodec(proto.Name) 1427 } 1428 codec := encoding.GetCodec(contentSubtype) 1429 if codec == nil { 1430 return encoding.GetCodec(proto.Name) 1431 } 1432 return codec 1433} 1434 1435// SetHeader sets the header metadata. 1436// When called multiple times, all the provided metadata will be merged. 1437// All the metadata will be sent out when one of the following happens: 1438// - grpc.SendHeader() is called; 1439// - The first response is sent out; 1440// - An RPC status is sent out (error or success). 1441func SetHeader(ctx context.Context, md metadata.MD) error { 1442 if md.Len() == 0 { 1443 return nil 1444 } 1445 stream := ServerTransportStreamFromContext(ctx) 1446 if stream == nil { 1447 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1448 } 1449 return stream.SetHeader(md) 1450} 1451 1452// SendHeader sends header metadata. It may be called at most once. 1453// The provided md and headers set by SetHeader() will be sent. 1454func SendHeader(ctx context.Context, md metadata.MD) error { 1455 stream := ServerTransportStreamFromContext(ctx) 1456 if stream == nil { 1457 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1458 } 1459 if err := stream.SendHeader(md); err != nil { 1460 return toRPCErr(err) 1461 } 1462 return nil 1463} 1464 1465// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1466// When called more than once, all the provided metadata will be merged. 1467func SetTrailer(ctx context.Context, md metadata.MD) error { 1468 if md.Len() == 0 { 1469 return nil 1470 } 1471 stream := ServerTransportStreamFromContext(ctx) 1472 if stream == nil { 1473 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1474 } 1475 return stream.SetTrailer(md) 1476} 1477 1478// Method returns the method string for the server context. The returned 1479// string is in the format of "/service/method". 1480func Method(ctx context.Context) (string, bool) { 1481 s := ServerTransportStreamFromContext(ctx) 1482 if s == nil { 1483 return "", false 1484 } 1485 return s.Method(), true 1486} 1487