1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"bytes"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"time"
34
35	"io/ioutil"
36
37	"golang.org/x/net/context"
38	"golang.org/x/net/http2"
39	"golang.org/x/net/trace"
40
41	"google.golang.org/grpc/channelz"
42	"google.golang.org/grpc/codes"
43	"google.golang.org/grpc/credentials"
44	"google.golang.org/grpc/encoding"
45	"google.golang.org/grpc/encoding/proto"
46	"google.golang.org/grpc/grpclog"
47	"google.golang.org/grpc/internal"
48	"google.golang.org/grpc/keepalive"
49	"google.golang.org/grpc/metadata"
50	"google.golang.org/grpc/stats"
51	"google.golang.org/grpc/status"
52	"google.golang.org/grpc/tap"
53	"google.golang.org/grpc/transport"
54)
55
56const (
57	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
58	defaultServerMaxSendMessageSize    = math.MaxInt32
59)
60
61type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
62
63// MethodDesc represents an RPC service's method specification.
64type MethodDesc struct {
65	MethodName string
66	Handler    methodHandler
67}
68
69// ServiceDesc represents an RPC service's specification.
70type ServiceDesc struct {
71	ServiceName string
72	// The pointer to the service interface. Used to check whether the user
73	// provided implementation satisfies the interface requirements.
74	HandlerType interface{}
75	Methods     []MethodDesc
76	Streams     []StreamDesc
77	Metadata    interface{}
78}
79
80// service consists of the information of the server serving this service and
81// the methods in this service.
82type service struct {
83	server interface{} // the server for service methods
84	md     map[string]*MethodDesc
85	sd     map[string]*StreamDesc
86	mdata  interface{}
87}
88
89// Server is a gRPC server to serve RPC requests.
90type Server struct {
91	opts options
92
93	mu     sync.Mutex // guards following
94	lis    map[net.Listener]bool
95	conns  map[io.Closer]bool
96	serve  bool
97	drain  bool
98	cv     *sync.Cond          // signaled when connections close for GracefulStop
99	m      map[string]*service // service name -> service info
100	events trace.EventLog
101
102	quit               chan struct{}
103	done               chan struct{}
104	quitOnce           sync.Once
105	doneOnce           sync.Once
106	channelzRemoveOnce sync.Once
107	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
108
109	channelzID          int64 // channelz unique identification number
110	czmu                sync.RWMutex
111	callsStarted        int64
112	callsFailed         int64
113	callsSucceeded      int64
114	lastCallStartedTime time.Time
115}
116
117type options struct {
118	creds                 credentials.TransportCredentials
119	codec                 baseCodec
120	cp                    Compressor
121	dc                    Decompressor
122	unaryInt              UnaryServerInterceptor
123	streamInt             StreamServerInterceptor
124	inTapHandle           tap.ServerInHandle
125	statsHandler          stats.Handler
126	maxConcurrentStreams  uint32
127	maxReceiveMessageSize int
128	maxSendMessageSize    int
129	useHandlerImpl        bool // use http.Handler-based server
130	unknownStreamDesc     *StreamDesc
131	keepaliveParams       keepalive.ServerParameters
132	keepalivePolicy       keepalive.EnforcementPolicy
133	initialWindowSize     int32
134	initialConnWindowSize int32
135	writeBufferSize       int
136	readBufferSize        int
137	connectionTimeout     time.Duration
138}
139
140var defaultServerOptions = options{
141	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
142	maxSendMessageSize:    defaultServerMaxSendMessageSize,
143	connectionTimeout:     120 * time.Second,
144}
145
146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
147type ServerOption func(*options)
148
149// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
150// before doing a write on the wire.
151func WriteBufferSize(s int) ServerOption {
152	return func(o *options) {
153		o.writeBufferSize = s
154	}
155}
156
157// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
158// for one read syscall.
159func ReadBufferSize(s int) ServerOption {
160	return func(o *options) {
161		o.readBufferSize = s
162	}
163}
164
165// InitialWindowSize returns a ServerOption that sets window size for stream.
166// The lower bound for window size is 64K and any value smaller than that will be ignored.
167func InitialWindowSize(s int32) ServerOption {
168	return func(o *options) {
169		o.initialWindowSize = s
170	}
171}
172
173// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
174// The lower bound for window size is 64K and any value smaller than that will be ignored.
175func InitialConnWindowSize(s int32) ServerOption {
176	return func(o *options) {
177		o.initialConnWindowSize = s
178	}
179}
180
181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
183	return func(o *options) {
184		o.keepaliveParams = kp
185	}
186}
187
188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
190	return func(o *options) {
191		o.keepalivePolicy = kep
192	}
193}
194
195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
196//
197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
198func CustomCodec(codec Codec) ServerOption {
199	return func(o *options) {
200		o.codec = codec
201	}
202}
203
204// RPCCompressor returns a ServerOption that sets a compressor for outbound
205// messages.  For backward compatibility, all outbound messages will be sent
206// using this compressor, regardless of incoming message compression.  By
207// default, server messages will be sent using the same compressor with which
208// request messages were sent.
209//
210// Deprecated: use encoding.RegisterCompressor instead.
211func RPCCompressor(cp Compressor) ServerOption {
212	return func(o *options) {
213		o.cp = cp
214	}
215}
216
217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
218// messages.  It has higher priority than decompressors registered via
219// encoding.RegisterCompressor.
220//
221// Deprecated: use encoding.RegisterCompressor instead.
222func RPCDecompressor(dc Decompressor) ServerOption {
223	return func(o *options) {
224		o.dc = dc
225	}
226}
227
228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
229// If this is not set, gRPC uses the default limit.
230//
231// Deprecated: use MaxRecvMsgSize instead.
232func MaxMsgSize(m int) ServerOption {
233	return MaxRecvMsgSize(m)
234}
235
236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
237// If this is not set, gRPC uses the default 4MB.
238func MaxRecvMsgSize(m int) ServerOption {
239	return func(o *options) {
240		o.maxReceiveMessageSize = m
241	}
242}
243
244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
245// If this is not set, gRPC uses the default 4MB.
246func MaxSendMsgSize(m int) ServerOption {
247	return func(o *options) {
248		o.maxSendMessageSize = m
249	}
250}
251
252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
253// of concurrent streams to each ServerTransport.
254func MaxConcurrentStreams(n uint32) ServerOption {
255	return func(o *options) {
256		o.maxConcurrentStreams = n
257	}
258}
259
260// Creds returns a ServerOption that sets credentials for server connections.
261func Creds(c credentials.TransportCredentials) ServerOption {
262	return func(o *options) {
263		o.creds = c
264	}
265}
266
267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
268// server. Only one unary interceptor can be installed. The construction of multiple
269// interceptors (e.g., chaining) can be implemented at the caller.
270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
271	return func(o *options) {
272		if o.unaryInt != nil {
273			panic("The unary server interceptor was already set and may not be reset.")
274		}
275		o.unaryInt = i
276	}
277}
278
279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
280// server. Only one stream interceptor can be installed.
281func StreamInterceptor(i StreamServerInterceptor) ServerOption {
282	return func(o *options) {
283		if o.streamInt != nil {
284			panic("The stream server interceptor was already set and may not be reset.")
285		}
286		o.streamInt = i
287	}
288}
289
290// InTapHandle returns a ServerOption that sets the tap handle for all the server
291// transport to be created. Only one can be installed.
292func InTapHandle(h tap.ServerInHandle) ServerOption {
293	return func(o *options) {
294		if o.inTapHandle != nil {
295			panic("The tap handle was already set and may not be reset.")
296		}
297		o.inTapHandle = h
298	}
299}
300
301// StatsHandler returns a ServerOption that sets the stats handler for the server.
302func StatsHandler(h stats.Handler) ServerOption {
303	return func(o *options) {
304		o.statsHandler = h
305	}
306}
307
308// UnknownServiceHandler returns a ServerOption that allows for adding a custom
309// unknown service handler. The provided method is a bidi-streaming RPC service
310// handler that will be invoked instead of returning the "unimplemented" gRPC
311// error whenever a request is received for an unregistered service or method.
312// The handling function has full access to the Context of the request and the
313// stream, and the invocation bypasses interceptors.
314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
315	return func(o *options) {
316		o.unknownStreamDesc = &StreamDesc{
317			StreamName: "unknown_service_handler",
318			Handler:    streamHandler,
319			// We need to assume that the users of the streamHandler will want to use both.
320			ClientStreams: true,
321			ServerStreams: true,
322		}
323	}
324}
325
326// ConnectionTimeout returns a ServerOption that sets the timeout for
327// connection establishment (up to and including HTTP/2 handshaking) for all
328// new connections.  If this is not set, the default is 120 seconds.  A zero or
329// negative value will result in an immediate timeout.
330//
331// This API is EXPERIMENTAL.
332func ConnectionTimeout(d time.Duration) ServerOption {
333	return func(o *options) {
334		o.connectionTimeout = d
335	}
336}
337
338// NewServer creates a gRPC server which has no service registered and has not
339// started to accept requests yet.
340func NewServer(opt ...ServerOption) *Server {
341	opts := defaultServerOptions
342	for _, o := range opt {
343		o(&opts)
344	}
345	s := &Server{
346		lis:   make(map[net.Listener]bool),
347		opts:  opts,
348		conns: make(map[io.Closer]bool),
349		m:     make(map[string]*service),
350		quit:  make(chan struct{}),
351		done:  make(chan struct{}),
352	}
353	s.cv = sync.NewCond(&s.mu)
354	if EnableTracing {
355		_, file, line, _ := runtime.Caller(1)
356		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
357	}
358
359	if channelz.IsOn() {
360		s.channelzID = channelz.RegisterServer(s, "")
361	}
362	return s
363}
364
365// printf records an event in s's event log, unless s has been stopped.
366// REQUIRES s.mu is held.
367func (s *Server) printf(format string, a ...interface{}) {
368	if s.events != nil {
369		s.events.Printf(format, a...)
370	}
371}
372
373// errorf records an error in s's event log, unless s has been stopped.
374// REQUIRES s.mu is held.
375func (s *Server) errorf(format string, a ...interface{}) {
376	if s.events != nil {
377		s.events.Errorf(format, a...)
378	}
379}
380
381// RegisterService registers a service and its implementation to the gRPC
382// server. It is called from the IDL generated code. This must be called before
383// invoking Serve.
384func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
385	ht := reflect.TypeOf(sd.HandlerType).Elem()
386	st := reflect.TypeOf(ss)
387	if !st.Implements(ht) {
388		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
389	}
390	s.register(sd, ss)
391}
392
393func (s *Server) register(sd *ServiceDesc, ss interface{}) {
394	s.mu.Lock()
395	defer s.mu.Unlock()
396	s.printf("RegisterService(%q)", sd.ServiceName)
397	if s.serve {
398		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
399	}
400	if _, ok := s.m[sd.ServiceName]; ok {
401		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
402	}
403	srv := &service{
404		server: ss,
405		md:     make(map[string]*MethodDesc),
406		sd:     make(map[string]*StreamDesc),
407		mdata:  sd.Metadata,
408	}
409	for i := range sd.Methods {
410		d := &sd.Methods[i]
411		srv.md[d.MethodName] = d
412	}
413	for i := range sd.Streams {
414		d := &sd.Streams[i]
415		srv.sd[d.StreamName] = d
416	}
417	s.m[sd.ServiceName] = srv
418}
419
420// MethodInfo contains the information of an RPC including its method name and type.
421type MethodInfo struct {
422	// Name is the method name only, without the service name or package name.
423	Name string
424	// IsClientStream indicates whether the RPC is a client streaming RPC.
425	IsClientStream bool
426	// IsServerStream indicates whether the RPC is a server streaming RPC.
427	IsServerStream bool
428}
429
430// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
431type ServiceInfo struct {
432	Methods []MethodInfo
433	// Metadata is the metadata specified in ServiceDesc when registering service.
434	Metadata interface{}
435}
436
437// GetServiceInfo returns a map from service names to ServiceInfo.
438// Service names include the package names, in the form of <package>.<service>.
439func (s *Server) GetServiceInfo() map[string]ServiceInfo {
440	ret := make(map[string]ServiceInfo)
441	for n, srv := range s.m {
442		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
443		for m := range srv.md {
444			methods = append(methods, MethodInfo{
445				Name:           m,
446				IsClientStream: false,
447				IsServerStream: false,
448			})
449		}
450		for m, d := range srv.sd {
451			methods = append(methods, MethodInfo{
452				Name:           m,
453				IsClientStream: d.ClientStreams,
454				IsServerStream: d.ServerStreams,
455			})
456		}
457
458		ret[n] = ServiceInfo{
459			Methods:  methods,
460			Metadata: srv.mdata,
461		}
462	}
463	return ret
464}
465
466// ErrServerStopped indicates that the operation is now illegal because of
467// the server being stopped.
468var ErrServerStopped = errors.New("grpc: the server has been stopped")
469
470func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
471	if s.opts.creds == nil {
472		return rawConn, nil, nil
473	}
474	return s.opts.creds.ServerHandshake(rawConn)
475}
476
477type listenSocket struct {
478	net.Listener
479	channelzID int64
480}
481
482func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
483	return &channelz.SocketInternalMetric{
484		LocalAddr: l.Listener.Addr(),
485	}
486}
487
488func (l *listenSocket) Close() error {
489	err := l.Listener.Close()
490	if channelz.IsOn() {
491		channelz.RemoveEntry(l.channelzID)
492	}
493	return err
494}
495
496// Serve accepts incoming connections on the listener lis, creating a new
497// ServerTransport and service goroutine for each. The service goroutines
498// read gRPC requests and then call the registered handlers to reply to them.
499// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
500// this method returns.
501// Serve will return a non-nil error unless Stop or GracefulStop is called.
502func (s *Server) Serve(lis net.Listener) error {
503	s.mu.Lock()
504	s.printf("serving")
505	s.serve = true
506	if s.lis == nil {
507		// Serve called after Stop or GracefulStop.
508		s.mu.Unlock()
509		lis.Close()
510		return ErrServerStopped
511	}
512
513	s.serveWG.Add(1)
514	defer func() {
515		s.serveWG.Done()
516		select {
517		// Stop or GracefulStop called; block until done and return nil.
518		case <-s.quit:
519			<-s.done
520		default:
521		}
522	}()
523
524	ls := &listenSocket{Listener: lis}
525	s.lis[ls] = true
526
527	if channelz.IsOn() {
528		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
529	}
530	s.mu.Unlock()
531
532	defer func() {
533		s.mu.Lock()
534		if s.lis != nil && s.lis[ls] {
535			ls.Close()
536			delete(s.lis, ls)
537		}
538		s.mu.Unlock()
539	}()
540
541	var tempDelay time.Duration // how long to sleep on accept failure
542
543	for {
544		rawConn, err := lis.Accept()
545		if err != nil {
546			if ne, ok := err.(interface {
547				Temporary() bool
548			}); ok && ne.Temporary() {
549				if tempDelay == 0 {
550					tempDelay = 5 * time.Millisecond
551				} else {
552					tempDelay *= 2
553				}
554				if max := 1 * time.Second; tempDelay > max {
555					tempDelay = max
556				}
557				s.mu.Lock()
558				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
559				s.mu.Unlock()
560				timer := time.NewTimer(tempDelay)
561				select {
562				case <-timer.C:
563				case <-s.quit:
564					timer.Stop()
565					return nil
566				}
567				continue
568			}
569			s.mu.Lock()
570			s.printf("done serving; Accept = %v", err)
571			s.mu.Unlock()
572
573			select {
574			case <-s.quit:
575				return nil
576			default:
577			}
578			return err
579		}
580		tempDelay = 0
581		// Start a new goroutine to deal with rawConn so we don't stall this Accept
582		// loop goroutine.
583		//
584		// Make sure we account for the goroutine so GracefulStop doesn't nil out
585		// s.conns before this conn can be added.
586		s.serveWG.Add(1)
587		go func() {
588			s.handleRawConn(rawConn)
589			s.serveWG.Done()
590		}()
591	}
592}
593
594// handleRawConn forks a goroutine to handle a just-accepted connection that
595// has not had any I/O performed on it yet.
596func (s *Server) handleRawConn(rawConn net.Conn) {
597	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
598	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
599	if err != nil {
600		s.mu.Lock()
601		s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
602		s.mu.Unlock()
603		grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
604		// If serverHandshake returns ErrConnDispatched, keep rawConn open.
605		if err != credentials.ErrConnDispatched {
606			rawConn.Close()
607		}
608		rawConn.SetDeadline(time.Time{})
609		return
610	}
611
612	s.mu.Lock()
613	if s.conns == nil {
614		s.mu.Unlock()
615		conn.Close()
616		return
617	}
618	s.mu.Unlock()
619
620	var serve func()
621	c := conn.(io.Closer)
622	if s.opts.useHandlerImpl {
623		serve = func() { s.serveUsingHandler(conn) }
624	} else {
625		// Finish handshaking (HTTP2)
626		st := s.newHTTP2Transport(conn, authInfo)
627		if st == nil {
628			return
629		}
630		c = st
631		serve = func() { s.serveStreams(st) }
632	}
633
634	rawConn.SetDeadline(time.Time{})
635	if !s.addConn(c) {
636		return
637	}
638	go func() {
639		serve()
640		s.removeConn(c)
641	}()
642}
643
644// newHTTP2Transport sets up a http/2 transport (using the
645// gRPC http2 server transport in transport/http2_server.go).
646func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
647	config := &transport.ServerConfig{
648		MaxStreams:            s.opts.maxConcurrentStreams,
649		AuthInfo:              authInfo,
650		InTapHandle:           s.opts.inTapHandle,
651		StatsHandler:          s.opts.statsHandler,
652		KeepaliveParams:       s.opts.keepaliveParams,
653		KeepalivePolicy:       s.opts.keepalivePolicy,
654		InitialWindowSize:     s.opts.initialWindowSize,
655		InitialConnWindowSize: s.opts.initialConnWindowSize,
656		WriteBufferSize:       s.opts.writeBufferSize,
657		ReadBufferSize:        s.opts.readBufferSize,
658		ChannelzParentID:      s.channelzID,
659	}
660	st, err := transport.NewServerTransport("http2", c, config)
661	if err != nil {
662		s.mu.Lock()
663		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
664		s.mu.Unlock()
665		c.Close()
666		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
667		return nil
668	}
669
670	return st
671}
672
673func (s *Server) serveStreams(st transport.ServerTransport) {
674	defer st.Close()
675	var wg sync.WaitGroup
676	st.HandleStreams(func(stream *transport.Stream) {
677		wg.Add(1)
678		go func() {
679			defer wg.Done()
680			s.handleStream(st, stream, s.traceInfo(st, stream))
681		}()
682	}, func(ctx context.Context, method string) context.Context {
683		if !EnableTracing {
684			return ctx
685		}
686		tr := trace.New("grpc.Recv."+methodFamily(method), method)
687		return trace.NewContext(ctx, tr)
688	})
689	wg.Wait()
690}
691
692var _ http.Handler = (*Server)(nil)
693
694// serveUsingHandler is called from handleRawConn when s is configured
695// to handle requests via the http.Handler interface. It sets up a
696// net/http.Server to handle the just-accepted conn. The http.Server
697// is configured to route all incoming requests (all HTTP/2 streams)
698// to ServeHTTP, which creates a new ServerTransport for each stream.
699// serveUsingHandler blocks until conn closes.
700//
701// This codepath is only used when Server.TestingUseHandlerImpl has
702// been configured. This lets the end2end tests exercise the ServeHTTP
703// method as one of the environment types.
704//
705// conn is the *tls.Conn that's already been authenticated.
706func (s *Server) serveUsingHandler(conn net.Conn) {
707	h2s := &http2.Server{
708		MaxConcurrentStreams: s.opts.maxConcurrentStreams,
709	}
710	h2s.ServeConn(conn, &http2.ServeConnOpts{
711		Handler: s,
712	})
713}
714
715// ServeHTTP implements the Go standard library's http.Handler
716// interface by responding to the gRPC request r, by looking up
717// the requested gRPC method in the gRPC server s.
718//
719// The provided HTTP request must have arrived on an HTTP/2
720// connection. When using the Go standard library's server,
721// practically this means that the Request must also have arrived
722// over TLS.
723//
724// To share one port (such as 443 for https) between gRPC and an
725// existing http.Handler, use a root http.Handler such as:
726//
727//   if r.ProtoMajor == 2 && strings.HasPrefix(
728//   	r.Header.Get("Content-Type"), "application/grpc") {
729//   	grpcServer.ServeHTTP(w, r)
730//   } else {
731//   	yourMux.ServeHTTP(w, r)
732//   }
733//
734// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
735// separate from grpc-go's HTTP/2 server. Performance and features may vary
736// between the two paths. ServeHTTP does not support some gRPC features
737// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
738// and subject to change.
739func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
740	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
741	if err != nil {
742		http.Error(w, err.Error(), http.StatusInternalServerError)
743		return
744	}
745	if !s.addConn(st) {
746		return
747	}
748	defer s.removeConn(st)
749	s.serveStreams(st)
750}
751
752// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
753// If tracing is not enabled, it returns nil.
754func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
755	tr, ok := trace.FromContext(stream.Context())
756	if !ok {
757		return nil
758	}
759
760	trInfo = &traceInfo{
761		tr: tr,
762	}
763	trInfo.firstLine.client = false
764	trInfo.firstLine.remoteAddr = st.RemoteAddr()
765
766	if dl, ok := stream.Context().Deadline(); ok {
767		trInfo.firstLine.deadline = dl.Sub(time.Now())
768	}
769	return trInfo
770}
771
772func (s *Server) addConn(c io.Closer) bool {
773	s.mu.Lock()
774	defer s.mu.Unlock()
775	if s.conns == nil {
776		c.Close()
777		return false
778	}
779	if s.drain {
780		// Transport added after we drained our existing conns: drain it
781		// immediately.
782		c.(transport.ServerTransport).Drain()
783	}
784	s.conns[c] = true
785	return true
786}
787
788func (s *Server) removeConn(c io.Closer) {
789	s.mu.Lock()
790	defer s.mu.Unlock()
791	if s.conns != nil {
792		delete(s.conns, c)
793		s.cv.Broadcast()
794	}
795}
796
797// ChannelzMetric returns ServerInternalMetric of current server.
798// This is an EXPERIMENTAL API.
799func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
800	s.czmu.RLock()
801	defer s.czmu.RUnlock()
802	return &channelz.ServerInternalMetric{
803		CallsStarted:             s.callsStarted,
804		CallsSucceeded:           s.callsSucceeded,
805		CallsFailed:              s.callsFailed,
806		LastCallStartedTimestamp: s.lastCallStartedTime,
807	}
808}
809
810func (s *Server) incrCallsStarted() {
811	s.czmu.Lock()
812	s.callsStarted++
813	s.lastCallStartedTime = time.Now()
814	s.czmu.Unlock()
815}
816
817func (s *Server) incrCallsSucceeded() {
818	s.czmu.Lock()
819	s.callsSucceeded++
820	s.czmu.Unlock()
821}
822
823func (s *Server) incrCallsFailed() {
824	s.czmu.Lock()
825	s.callsFailed++
826	s.czmu.Unlock()
827}
828
829func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
830	var (
831		outPayload *stats.OutPayload
832	)
833	if s.opts.statsHandler != nil {
834		outPayload = &stats.OutPayload{}
835	}
836	hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp)
837	if err != nil {
838		grpclog.Errorln("grpc: server failed to encode response: ", err)
839		return err
840	}
841	if len(data) > s.opts.maxSendMessageSize {
842		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
843	}
844	err = t.Write(stream, hdr, data, opts)
845	if err == nil && outPayload != nil {
846		outPayload.SentTime = time.Now()
847		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
848	}
849	return err
850}
851
852func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
853	if channelz.IsOn() {
854		s.incrCallsStarted()
855		defer func() {
856			if err != nil && err != io.EOF {
857				s.incrCallsFailed()
858			} else {
859				s.incrCallsSucceeded()
860			}
861		}()
862	}
863	sh := s.opts.statsHandler
864	if sh != nil {
865		beginTime := time.Now()
866		begin := &stats.Begin{
867			BeginTime: beginTime,
868		}
869		sh.HandleRPC(stream.Context(), begin)
870		defer func() {
871			end := &stats.End{
872				BeginTime: beginTime,
873				EndTime:   time.Now(),
874			}
875			if err != nil && err != io.EOF {
876				end.Error = toRPCErr(err)
877			}
878			sh.HandleRPC(stream.Context(), end)
879		}()
880	}
881	if trInfo != nil {
882		defer trInfo.tr.Finish()
883		trInfo.firstLine.client = false
884		trInfo.tr.LazyLog(&trInfo.firstLine, false)
885		defer func() {
886			if err != nil && err != io.EOF {
887				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
888				trInfo.tr.SetError()
889			}
890		}()
891	}
892
893	// comp and cp are used for compression.  decomp and dc are used for
894	// decompression.  If comp and decomp are both set, they are the same;
895	// however they are kept separate to ensure that at most one of the
896	// compressor/decompressor variable pairs are set for use later.
897	var comp, decomp encoding.Compressor
898	var cp Compressor
899	var dc Decompressor
900
901	// If dc is set and matches the stream's compression, use it.  Otherwise, try
902	// to find a matching registered compressor for decomp.
903	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
904		dc = s.opts.dc
905	} else if rc != "" && rc != encoding.Identity {
906		decomp = encoding.GetCompressor(rc)
907		if decomp == nil {
908			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
909			t.WriteStatus(stream, st)
910			return st.Err()
911		}
912	}
913
914	// If cp is set, use it.  Otherwise, attempt to compress the response using
915	// the incoming message compression method.
916	//
917	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
918	if s.opts.cp != nil {
919		cp = s.opts.cp
920		stream.SetSendCompress(cp.Type())
921	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
922		// Legacy compressor not specified; attempt to respond with same encoding.
923		comp = encoding.GetCompressor(rc)
924		if comp != nil {
925			stream.SetSendCompress(rc)
926		}
927	}
928
929	p := &parser{r: stream}
930	pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
931	if err == io.EOF {
932		// The entire stream is done (for unary RPC only).
933		return err
934	}
935	if err == io.ErrUnexpectedEOF {
936		err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
937	}
938	if err != nil {
939		if st, ok := status.FromError(err); ok {
940			if e := t.WriteStatus(stream, st); e != nil {
941				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
942			}
943		} else {
944			switch st := err.(type) {
945			case transport.ConnectionError:
946				// Nothing to do here.
947			case transport.StreamError:
948				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
949					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
950				}
951			default:
952				panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
953			}
954		}
955		return err
956	}
957	if channelz.IsOn() {
958		t.IncrMsgRecv()
959	}
960	if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
961		if e := t.WriteStatus(stream, st); e != nil {
962			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
963		}
964		return st.Err()
965	}
966	var inPayload *stats.InPayload
967	if sh != nil {
968		inPayload = &stats.InPayload{
969			RecvTime: time.Now(),
970		}
971	}
972	df := func(v interface{}) error {
973		if inPayload != nil {
974			inPayload.WireLength = len(req)
975		}
976		if pf == compressionMade {
977			var err error
978			if dc != nil {
979				req, err = dc.Do(bytes.NewReader(req))
980				if err != nil {
981					return status.Errorf(codes.Internal, err.Error())
982				}
983			} else {
984				tmp, _ := decomp.Decompress(bytes.NewReader(req))
985				req, err = ioutil.ReadAll(tmp)
986				if err != nil {
987					return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
988				}
989			}
990		}
991		if len(req) > s.opts.maxReceiveMessageSize {
992			// TODO: Revisit the error code. Currently keep it consistent with
993			// java implementation.
994			return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
995		}
996		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
997			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
998		}
999		if inPayload != nil {
1000			inPayload.Payload = v
1001			inPayload.Data = req
1002			inPayload.Length = len(req)
1003			sh.HandleRPC(stream.Context(), inPayload)
1004		}
1005		if trInfo != nil {
1006			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1007		}
1008		return nil
1009	}
1010	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1011	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1012	if appErr != nil {
1013		appStatus, ok := status.FromError(appErr)
1014		if !ok {
1015			// Convert appErr if it is not a grpc status error.
1016			appErr = status.Error(codes.Unknown, appErr.Error())
1017			appStatus, _ = status.FromError(appErr)
1018		}
1019		if trInfo != nil {
1020			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1021			trInfo.tr.SetError()
1022		}
1023		if e := t.WriteStatus(stream, appStatus); e != nil {
1024			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1025		}
1026		return appErr
1027	}
1028	if trInfo != nil {
1029		trInfo.tr.LazyLog(stringer("OK"), false)
1030	}
1031	opts := &transport.Options{
1032		Last:  true,
1033		Delay: false,
1034	}
1035
1036	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1037		if err == io.EOF {
1038			// The entire stream is done (for unary RPC only).
1039			return err
1040		}
1041		if s, ok := status.FromError(err); ok {
1042			if e := t.WriteStatus(stream, s); e != nil {
1043				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1044			}
1045		} else {
1046			switch st := err.(type) {
1047			case transport.ConnectionError:
1048				// Nothing to do here.
1049			case transport.StreamError:
1050				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
1051					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
1052				}
1053			default:
1054				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1055			}
1056		}
1057		return err
1058	}
1059	if channelz.IsOn() {
1060		t.IncrMsgSent()
1061	}
1062	if trInfo != nil {
1063		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1064	}
1065	// TODO: Should we be logging if writing status failed here, like above?
1066	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1067	// error or allow the stats handler to see it?
1068	return t.WriteStatus(stream, status.New(codes.OK, ""))
1069}
1070
1071func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1072	if channelz.IsOn() {
1073		s.incrCallsStarted()
1074		defer func() {
1075			if err != nil && err != io.EOF {
1076				s.incrCallsFailed()
1077			} else {
1078				s.incrCallsSucceeded()
1079			}
1080		}()
1081	}
1082	sh := s.opts.statsHandler
1083	if sh != nil {
1084		beginTime := time.Now()
1085		begin := &stats.Begin{
1086			BeginTime: beginTime,
1087		}
1088		sh.HandleRPC(stream.Context(), begin)
1089		defer func() {
1090			end := &stats.End{
1091				BeginTime: beginTime,
1092				EndTime:   time.Now(),
1093			}
1094			if err != nil && err != io.EOF {
1095				end.Error = toRPCErr(err)
1096			}
1097			sh.HandleRPC(stream.Context(), end)
1098		}()
1099	}
1100	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1101	ss := &serverStream{
1102		ctx:   ctx,
1103		t:     t,
1104		s:     stream,
1105		p:     &parser{r: stream},
1106		codec: s.getCodec(stream.ContentSubtype()),
1107		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1108		maxSendMessageSize:    s.opts.maxSendMessageSize,
1109		trInfo:                trInfo,
1110		statsHandler:          sh,
1111	}
1112
1113	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1114	// to find a matching registered compressor for decomp.
1115	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1116		ss.dc = s.opts.dc
1117	} else if rc != "" && rc != encoding.Identity {
1118		ss.decomp = encoding.GetCompressor(rc)
1119		if ss.decomp == nil {
1120			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1121			t.WriteStatus(ss.s, st)
1122			return st.Err()
1123		}
1124	}
1125
1126	// If cp is set, use it.  Otherwise, attempt to compress the response using
1127	// the incoming message compression method.
1128	//
1129	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1130	if s.opts.cp != nil {
1131		ss.cp = s.opts.cp
1132		stream.SetSendCompress(s.opts.cp.Type())
1133	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1134		// Legacy compressor not specified; attempt to respond with same encoding.
1135		ss.comp = encoding.GetCompressor(rc)
1136		if ss.comp != nil {
1137			stream.SetSendCompress(rc)
1138		}
1139	}
1140
1141	if trInfo != nil {
1142		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1143		defer func() {
1144			ss.mu.Lock()
1145			if err != nil && err != io.EOF {
1146				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1147				ss.trInfo.tr.SetError()
1148			}
1149			ss.trInfo.tr.Finish()
1150			ss.trInfo.tr = nil
1151			ss.mu.Unlock()
1152		}()
1153	}
1154	var appErr error
1155	var server interface{}
1156	if srv != nil {
1157		server = srv.server
1158	}
1159	if s.opts.streamInt == nil {
1160		appErr = sd.Handler(server, ss)
1161	} else {
1162		info := &StreamServerInfo{
1163			FullMethod:     stream.Method(),
1164			IsClientStream: sd.ClientStreams,
1165			IsServerStream: sd.ServerStreams,
1166		}
1167		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1168	}
1169	if appErr != nil {
1170		appStatus, ok := status.FromError(appErr)
1171		if !ok {
1172			switch err := appErr.(type) {
1173			case transport.StreamError:
1174				appStatus = status.New(err.Code, err.Desc)
1175			default:
1176				appStatus = status.New(codes.Unknown, appErr.Error())
1177			}
1178			appErr = appStatus.Err()
1179		}
1180		if trInfo != nil {
1181			ss.mu.Lock()
1182			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1183			ss.trInfo.tr.SetError()
1184			ss.mu.Unlock()
1185		}
1186		t.WriteStatus(ss.s, appStatus)
1187		// TODO: Should we log an error from WriteStatus here and below?
1188		return appErr
1189	}
1190	if trInfo != nil {
1191		ss.mu.Lock()
1192		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1193		ss.mu.Unlock()
1194	}
1195	return t.WriteStatus(ss.s, status.New(codes.OK, ""))
1196}
1197
1198func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1199	sm := stream.Method()
1200	if sm != "" && sm[0] == '/' {
1201		sm = sm[1:]
1202	}
1203	pos := strings.LastIndex(sm, "/")
1204	if pos == -1 {
1205		if trInfo != nil {
1206			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1207			trInfo.tr.SetError()
1208		}
1209		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1210		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1211			if trInfo != nil {
1212				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1213				trInfo.tr.SetError()
1214			}
1215			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1216		}
1217		if trInfo != nil {
1218			trInfo.tr.Finish()
1219		}
1220		return
1221	}
1222	service := sm[:pos]
1223	method := sm[pos+1:]
1224	srv, ok := s.m[service]
1225	if !ok {
1226		if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1227			s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1228			return
1229		}
1230		if trInfo != nil {
1231			trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1232			trInfo.tr.SetError()
1233		}
1234		errDesc := fmt.Sprintf("unknown service %v", service)
1235		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1236			if trInfo != nil {
1237				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1238				trInfo.tr.SetError()
1239			}
1240			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1241		}
1242		if trInfo != nil {
1243			trInfo.tr.Finish()
1244		}
1245		return
1246	}
1247	// Unary RPC or Streaming RPC?
1248	if md, ok := srv.md[method]; ok {
1249		s.processUnaryRPC(t, stream, srv, md, trInfo)
1250		return
1251	}
1252	if sd, ok := srv.sd[method]; ok {
1253		s.processStreamingRPC(t, stream, srv, sd, trInfo)
1254		return
1255	}
1256	if trInfo != nil {
1257		trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
1258		trInfo.tr.SetError()
1259	}
1260	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1261		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1262		return
1263	}
1264	errDesc := fmt.Sprintf("unknown method %v", method)
1265	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1266		if trInfo != nil {
1267			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1268			trInfo.tr.SetError()
1269		}
1270		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1271	}
1272	if trInfo != nil {
1273		trInfo.tr.Finish()
1274	}
1275}
1276
1277// The key to save ServerTransportStream in the context.
1278type streamKey struct{}
1279
1280// NewContextWithServerTransportStream creates a new context from ctx and
1281// attaches stream to it.
1282//
1283// This API is EXPERIMENTAL.
1284func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1285	return context.WithValue(ctx, streamKey{}, stream)
1286}
1287
1288// ServerTransportStream is a minimal interface that a transport stream must
1289// implement. This can be used to mock an actual transport stream for tests of
1290// handler code that use, for example, grpc.SetHeader (which requires some
1291// stream to be in context).
1292//
1293// See also NewContextWithServerTransportStream.
1294//
1295// This API is EXPERIMENTAL.
1296type ServerTransportStream interface {
1297	Method() string
1298	SetHeader(md metadata.MD) error
1299	SendHeader(md metadata.MD) error
1300	SetTrailer(md metadata.MD) error
1301}
1302
1303// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1304// ctx. Returns nil if the given context has no stream associated with it
1305// (which implies it is not an RPC invocation context).
1306//
1307// This API is EXPERIMENTAL.
1308func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1309	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1310	return s
1311}
1312
1313// Stop stops the gRPC server. It immediately closes all open
1314// connections and listeners.
1315// It cancels all active RPCs on the server side and the corresponding
1316// pending RPCs on the client side will get notified by connection
1317// errors.
1318func (s *Server) Stop() {
1319	s.quitOnce.Do(func() {
1320		close(s.quit)
1321	})
1322
1323	defer func() {
1324		s.serveWG.Wait()
1325		s.doneOnce.Do(func() {
1326			close(s.done)
1327		})
1328	}()
1329
1330	s.channelzRemoveOnce.Do(func() {
1331		if channelz.IsOn() {
1332			channelz.RemoveEntry(s.channelzID)
1333		}
1334	})
1335
1336	s.mu.Lock()
1337	listeners := s.lis
1338	s.lis = nil
1339	st := s.conns
1340	s.conns = nil
1341	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1342	s.cv.Broadcast()
1343	s.mu.Unlock()
1344
1345	for lis := range listeners {
1346		lis.Close()
1347	}
1348	for c := range st {
1349		c.Close()
1350	}
1351
1352	s.mu.Lock()
1353	if s.events != nil {
1354		s.events.Finish()
1355		s.events = nil
1356	}
1357	s.mu.Unlock()
1358}
1359
1360// GracefulStop stops the gRPC server gracefully. It stops the server from
1361// accepting new connections and RPCs and blocks until all the pending RPCs are
1362// finished.
1363func (s *Server) GracefulStop() {
1364	s.quitOnce.Do(func() {
1365		close(s.quit)
1366	})
1367
1368	defer func() {
1369		s.doneOnce.Do(func() {
1370			close(s.done)
1371		})
1372	}()
1373
1374	s.channelzRemoveOnce.Do(func() {
1375		if channelz.IsOn() {
1376			channelz.RemoveEntry(s.channelzID)
1377		}
1378	})
1379	s.mu.Lock()
1380	if s.conns == nil {
1381		s.mu.Unlock()
1382		return
1383	}
1384
1385	for lis := range s.lis {
1386		lis.Close()
1387	}
1388	s.lis = nil
1389	if !s.drain {
1390		for c := range s.conns {
1391			c.(transport.ServerTransport).Drain()
1392		}
1393		s.drain = true
1394	}
1395
1396	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1397	// new conns will be created.
1398	s.mu.Unlock()
1399	s.serveWG.Wait()
1400	s.mu.Lock()
1401
1402	for len(s.conns) != 0 {
1403		s.cv.Wait()
1404	}
1405	s.conns = nil
1406	if s.events != nil {
1407		s.events.Finish()
1408		s.events = nil
1409	}
1410	s.mu.Unlock()
1411}
1412
1413func init() {
1414	internal.TestingUseHandlerImpl = func(arg interface{}) {
1415		arg.(*Server).opts.useHandlerImpl = true
1416	}
1417}
1418
1419// contentSubtype must be lowercase
1420// cannot return nil
1421func (s *Server) getCodec(contentSubtype string) baseCodec {
1422	if s.opts.codec != nil {
1423		return s.opts.codec
1424	}
1425	if contentSubtype == "" {
1426		return encoding.GetCodec(proto.Name)
1427	}
1428	codec := encoding.GetCodec(contentSubtype)
1429	if codec == nil {
1430		return encoding.GetCodec(proto.Name)
1431	}
1432	return codec
1433}
1434
1435// SetHeader sets the header metadata.
1436// When called multiple times, all the provided metadata will be merged.
1437// All the metadata will be sent out when one of the following happens:
1438//  - grpc.SendHeader() is called;
1439//  - The first response is sent out;
1440//  - An RPC status is sent out (error or success).
1441func SetHeader(ctx context.Context, md metadata.MD) error {
1442	if md.Len() == 0 {
1443		return nil
1444	}
1445	stream := ServerTransportStreamFromContext(ctx)
1446	if stream == nil {
1447		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1448	}
1449	return stream.SetHeader(md)
1450}
1451
1452// SendHeader sends header metadata. It may be called at most once.
1453// The provided md and headers set by SetHeader() will be sent.
1454func SendHeader(ctx context.Context, md metadata.MD) error {
1455	stream := ServerTransportStreamFromContext(ctx)
1456	if stream == nil {
1457		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1458	}
1459	if err := stream.SendHeader(md); err != nil {
1460		return toRPCErr(err)
1461	}
1462	return nil
1463}
1464
1465// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1466// When called more than once, all the provided metadata will be merged.
1467func SetTrailer(ctx context.Context, md metadata.MD) error {
1468	if md.Len() == 0 {
1469		return nil
1470	}
1471	stream := ServerTransportStreamFromContext(ctx)
1472	if stream == nil {
1473		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1474	}
1475	return stream.SetTrailer(md)
1476}
1477
1478// Method returns the method string for the server context.  The returned
1479// string is in the format of "/service/method".
1480func Method(ctx context.Context) (string, bool) {
1481	s := ServerTransportStreamFromContext(ctx)
1482	if s == nil {
1483		return "", false
1484	}
1485	return s.Method(), true
1486}
1487