1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"sync/atomic"
34	"time"
35
36	"golang.org/x/net/trace"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/encoding"
41	"google.golang.org/grpc/encoding/proto"
42	"google.golang.org/grpc/grpclog"
43	"google.golang.org/grpc/internal/binarylog"
44	"google.golang.org/grpc/internal/channelz"
45	"google.golang.org/grpc/internal/grpcsync"
46	"google.golang.org/grpc/internal/transport"
47	"google.golang.org/grpc/keepalive"
48	"google.golang.org/grpc/metadata"
49	"google.golang.org/grpc/peer"
50	"google.golang.org/grpc/stats"
51	"google.golang.org/grpc/status"
52	"google.golang.org/grpc/tap"
53)
54
55const (
56	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57	defaultServerMaxSendMessageSize    = math.MaxInt32
58)
59
60var statusOK = status.New(codes.OK, "")
61
62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
63
64// MethodDesc represents an RPC service's method specification.
65type MethodDesc struct {
66	MethodName string
67	Handler    methodHandler
68}
69
70// ServiceDesc represents an RPC service's specification.
71type ServiceDesc struct {
72	ServiceName string
73	// The pointer to the service interface. Used to check whether the user
74	// provided implementation satisfies the interface requirements.
75	HandlerType interface{}
76	Methods     []MethodDesc
77	Streams     []StreamDesc
78	Metadata    interface{}
79}
80
81// service consists of the information of the server serving this service and
82// the methods in this service.
83type service struct {
84	server interface{} // the server for service methods
85	md     map[string]*MethodDesc
86	sd     map[string]*StreamDesc
87	mdata  interface{}
88}
89
90// Server is a gRPC server to serve RPC requests.
91type Server struct {
92	opts serverOptions
93
94	mu     sync.Mutex // guards following
95	lis    map[net.Listener]bool
96	conns  map[transport.ServerTransport]bool
97	serve  bool
98	drain  bool
99	cv     *sync.Cond          // signaled when connections close for GracefulStop
100	m      map[string]*service // service name -> service info
101	events trace.EventLog
102
103	quit               *grpcsync.Event
104	done               *grpcsync.Event
105	channelzRemoveOnce sync.Once
106	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
107
108	channelzID int64 // channelz unique identification number
109	czData     *channelzData
110}
111
112type serverOptions struct {
113	creds                 credentials.TransportCredentials
114	codec                 baseCodec
115	cp                    Compressor
116	dc                    Decompressor
117	unaryInt              UnaryServerInterceptor
118	streamInt             StreamServerInterceptor
119	inTapHandle           tap.ServerInHandle
120	statsHandler          stats.Handler
121	maxConcurrentStreams  uint32
122	maxReceiveMessageSize int
123	maxSendMessageSize    int
124	unknownStreamDesc     *StreamDesc
125	keepaliveParams       keepalive.ServerParameters
126	keepalivePolicy       keepalive.EnforcementPolicy
127	initialWindowSize     int32
128	initialConnWindowSize int32
129	writeBufferSize       int
130	readBufferSize        int
131	connectionTimeout     time.Duration
132	maxHeaderListSize     *uint32
133}
134
135var defaultServerOptions = serverOptions{
136	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
137	maxSendMessageSize:    defaultServerMaxSendMessageSize,
138	connectionTimeout:     120 * time.Second,
139	writeBufferSize:       defaultWriteBufSize,
140	readBufferSize:        defaultReadBufSize,
141}
142
143// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
144type ServerOption interface {
145	apply(*serverOptions)
146}
147
148// EmptyServerOption does not alter the server configuration. It can be embedded
149// in another structure to build custom server options.
150//
151// This API is EXPERIMENTAL.
152type EmptyServerOption struct{}
153
154func (EmptyServerOption) apply(*serverOptions) {}
155
156// funcServerOption wraps a function that modifies serverOptions into an
157// implementation of the ServerOption interface.
158type funcServerOption struct {
159	f func(*serverOptions)
160}
161
162func (fdo *funcServerOption) apply(do *serverOptions) {
163	fdo.f(do)
164}
165
166func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
167	return &funcServerOption{
168		f: f,
169	}
170}
171
172// WriteBufferSize determines how much data can be batched before doing a write on the wire.
173// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
174// The default value for this buffer is 32KB.
175// Zero will disable the write buffer such that each write will be on underlying connection.
176// Note: A Send call may not directly translate to a write.
177func WriteBufferSize(s int) ServerOption {
178	return newFuncServerOption(func(o *serverOptions) {
179		o.writeBufferSize = s
180	})
181}
182
183// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
184// for one read syscall.
185// The default value for this buffer is 32KB.
186// Zero will disable read buffer for a connection so data framer can access the underlying
187// conn directly.
188func ReadBufferSize(s int) ServerOption {
189	return newFuncServerOption(func(o *serverOptions) {
190		o.readBufferSize = s
191	})
192}
193
194// InitialWindowSize returns a ServerOption that sets window size for stream.
195// The lower bound for window size is 64K and any value smaller than that will be ignored.
196func InitialWindowSize(s int32) ServerOption {
197	return newFuncServerOption(func(o *serverOptions) {
198		o.initialWindowSize = s
199	})
200}
201
202// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
203// The lower bound for window size is 64K and any value smaller than that will be ignored.
204func InitialConnWindowSize(s int32) ServerOption {
205	return newFuncServerOption(func(o *serverOptions) {
206		o.initialConnWindowSize = s
207	})
208}
209
210// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
211func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
212	if kp.Time > 0 && kp.Time < time.Second {
213		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
214		kp.Time = time.Second
215	}
216
217	return newFuncServerOption(func(o *serverOptions) {
218		o.keepaliveParams = kp
219	})
220}
221
222// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
223func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
224	return newFuncServerOption(func(o *serverOptions) {
225		o.keepalivePolicy = kep
226	})
227}
228
229// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
230//
231// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
232func CustomCodec(codec Codec) ServerOption {
233	return newFuncServerOption(func(o *serverOptions) {
234		o.codec = codec
235	})
236}
237
238// RPCCompressor returns a ServerOption that sets a compressor for outbound
239// messages.  For backward compatibility, all outbound messages will be sent
240// using this compressor, regardless of incoming message compression.  By
241// default, server messages will be sent using the same compressor with which
242// request messages were sent.
243//
244// Deprecated: use encoding.RegisterCompressor instead.
245func RPCCompressor(cp Compressor) ServerOption {
246	return newFuncServerOption(func(o *serverOptions) {
247		o.cp = cp
248	})
249}
250
251// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
252// messages.  It has higher priority than decompressors registered via
253// encoding.RegisterCompressor.
254//
255// Deprecated: use encoding.RegisterCompressor instead.
256func RPCDecompressor(dc Decompressor) ServerOption {
257	return newFuncServerOption(func(o *serverOptions) {
258		o.dc = dc
259	})
260}
261
262// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
263// If this is not set, gRPC uses the default limit.
264//
265// Deprecated: use MaxRecvMsgSize instead.
266func MaxMsgSize(m int) ServerOption {
267	return MaxRecvMsgSize(m)
268}
269
270// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
271// If this is not set, gRPC uses the default 4MB.
272func MaxRecvMsgSize(m int) ServerOption {
273	return newFuncServerOption(func(o *serverOptions) {
274		o.maxReceiveMessageSize = m
275	})
276}
277
278// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
279// If this is not set, gRPC uses the default `math.MaxInt32`.
280func MaxSendMsgSize(m int) ServerOption {
281	return newFuncServerOption(func(o *serverOptions) {
282		o.maxSendMessageSize = m
283	})
284}
285
286// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
287// of concurrent streams to each ServerTransport.
288func MaxConcurrentStreams(n uint32) ServerOption {
289	return newFuncServerOption(func(o *serverOptions) {
290		o.maxConcurrentStreams = n
291	})
292}
293
294// Creds returns a ServerOption that sets credentials for server connections.
295func Creds(c credentials.TransportCredentials) ServerOption {
296	return newFuncServerOption(func(o *serverOptions) {
297		o.creds = c
298	})
299}
300
301// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
302// server. Only one unary interceptor can be installed. The construction of multiple
303// interceptors (e.g., chaining) can be implemented at the caller.
304func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
305	return newFuncServerOption(func(o *serverOptions) {
306		if o.unaryInt != nil {
307			panic("The unary server interceptor was already set and may not be reset.")
308		}
309		o.unaryInt = i
310	})
311}
312
313// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
314// server. Only one stream interceptor can be installed.
315func StreamInterceptor(i StreamServerInterceptor) ServerOption {
316	return newFuncServerOption(func(o *serverOptions) {
317		if o.streamInt != nil {
318			panic("The stream server interceptor was already set and may not be reset.")
319		}
320		o.streamInt = i
321	})
322}
323
324// InTapHandle returns a ServerOption that sets the tap handle for all the server
325// transport to be created. Only one can be installed.
326func InTapHandle(h tap.ServerInHandle) ServerOption {
327	return newFuncServerOption(func(o *serverOptions) {
328		if o.inTapHandle != nil {
329			panic("The tap handle was already set and may not be reset.")
330		}
331		o.inTapHandle = h
332	})
333}
334
335// StatsHandler returns a ServerOption that sets the stats handler for the server.
336func StatsHandler(h stats.Handler) ServerOption {
337	return newFuncServerOption(func(o *serverOptions) {
338		o.statsHandler = h
339	})
340}
341
342// UnknownServiceHandler returns a ServerOption that allows for adding a custom
343// unknown service handler. The provided method is a bidi-streaming RPC service
344// handler that will be invoked instead of returning the "unimplemented" gRPC
345// error whenever a request is received for an unregistered service or method.
346// The handling function has full access to the Context of the request and the
347// stream, and the invocation bypasses interceptors.
348func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
349	return newFuncServerOption(func(o *serverOptions) {
350		o.unknownStreamDesc = &StreamDesc{
351			StreamName: "unknown_service_handler",
352			Handler:    streamHandler,
353			// We need to assume that the users of the streamHandler will want to use both.
354			ClientStreams: true,
355			ServerStreams: true,
356		}
357	})
358}
359
360// ConnectionTimeout returns a ServerOption that sets the timeout for
361// connection establishment (up to and including HTTP/2 handshaking) for all
362// new connections.  If this is not set, the default is 120 seconds.  A zero or
363// negative value will result in an immediate timeout.
364//
365// This API is EXPERIMENTAL.
366func ConnectionTimeout(d time.Duration) ServerOption {
367	return newFuncServerOption(func(o *serverOptions) {
368		o.connectionTimeout = d
369	})
370}
371
372// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
373// of header list that the server is prepared to accept.
374func MaxHeaderListSize(s uint32) ServerOption {
375	return newFuncServerOption(func(o *serverOptions) {
376		o.maxHeaderListSize = &s
377	})
378}
379
380// NewServer creates a gRPC server which has no service registered and has not
381// started to accept requests yet.
382func NewServer(opt ...ServerOption) *Server {
383	opts := defaultServerOptions
384	for _, o := range opt {
385		o.apply(&opts)
386	}
387	s := &Server{
388		lis:    make(map[net.Listener]bool),
389		opts:   opts,
390		conns:  make(map[transport.ServerTransport]bool),
391		m:      make(map[string]*service),
392		quit:   grpcsync.NewEvent(),
393		done:   grpcsync.NewEvent(),
394		czData: new(channelzData),
395	}
396	s.cv = sync.NewCond(&s.mu)
397	if EnableTracing {
398		_, file, line, _ := runtime.Caller(1)
399		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
400	}
401
402	if channelz.IsOn() {
403		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
404	}
405	return s
406}
407
408// printf records an event in s's event log, unless s has been stopped.
409// REQUIRES s.mu is held.
410func (s *Server) printf(format string, a ...interface{}) {
411	if s.events != nil {
412		s.events.Printf(format, a...)
413	}
414}
415
416// errorf records an error in s's event log, unless s has been stopped.
417// REQUIRES s.mu is held.
418func (s *Server) errorf(format string, a ...interface{}) {
419	if s.events != nil {
420		s.events.Errorf(format, a...)
421	}
422}
423
424// RegisterService registers a service and its implementation to the gRPC
425// server. It is called from the IDL generated code. This must be called before
426// invoking Serve.
427func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
428	ht := reflect.TypeOf(sd.HandlerType).Elem()
429	st := reflect.TypeOf(ss)
430	if !st.Implements(ht) {
431		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
432	}
433	s.register(sd, ss)
434}
435
436func (s *Server) register(sd *ServiceDesc, ss interface{}) {
437	s.mu.Lock()
438	defer s.mu.Unlock()
439	s.printf("RegisterService(%q)", sd.ServiceName)
440	if s.serve {
441		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
442	}
443	if _, ok := s.m[sd.ServiceName]; ok {
444		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
445	}
446	srv := &service{
447		server: ss,
448		md:     make(map[string]*MethodDesc),
449		sd:     make(map[string]*StreamDesc),
450		mdata:  sd.Metadata,
451	}
452	for i := range sd.Methods {
453		d := &sd.Methods[i]
454		srv.md[d.MethodName] = d
455	}
456	for i := range sd.Streams {
457		d := &sd.Streams[i]
458		srv.sd[d.StreamName] = d
459	}
460	s.m[sd.ServiceName] = srv
461}
462
463// MethodInfo contains the information of an RPC including its method name and type.
464type MethodInfo struct {
465	// Name is the method name only, without the service name or package name.
466	Name string
467	// IsClientStream indicates whether the RPC is a client streaming RPC.
468	IsClientStream bool
469	// IsServerStream indicates whether the RPC is a server streaming RPC.
470	IsServerStream bool
471}
472
473// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
474type ServiceInfo struct {
475	Methods []MethodInfo
476	// Metadata is the metadata specified in ServiceDesc when registering service.
477	Metadata interface{}
478}
479
480// GetServiceInfo returns a map from service names to ServiceInfo.
481// Service names include the package names, in the form of <package>.<service>.
482func (s *Server) GetServiceInfo() map[string]ServiceInfo {
483	ret := make(map[string]ServiceInfo)
484	for n, srv := range s.m {
485		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
486		for m := range srv.md {
487			methods = append(methods, MethodInfo{
488				Name:           m,
489				IsClientStream: false,
490				IsServerStream: false,
491			})
492		}
493		for m, d := range srv.sd {
494			methods = append(methods, MethodInfo{
495				Name:           m,
496				IsClientStream: d.ClientStreams,
497				IsServerStream: d.ServerStreams,
498			})
499		}
500
501		ret[n] = ServiceInfo{
502			Methods:  methods,
503			Metadata: srv.mdata,
504		}
505	}
506	return ret
507}
508
509// ErrServerStopped indicates that the operation is now illegal because of
510// the server being stopped.
511var ErrServerStopped = errors.New("grpc: the server has been stopped")
512
513func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
514	if s.opts.creds == nil {
515		return rawConn, nil, nil
516	}
517	return s.opts.creds.ServerHandshake(rawConn)
518}
519
520type listenSocket struct {
521	net.Listener
522	channelzID int64
523}
524
525func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
526	return &channelz.SocketInternalMetric{
527		SocketOptions: channelz.GetSocketOption(l.Listener),
528		LocalAddr:     l.Listener.Addr(),
529	}
530}
531
532func (l *listenSocket) Close() error {
533	err := l.Listener.Close()
534	if channelz.IsOn() {
535		channelz.RemoveEntry(l.channelzID)
536	}
537	return err
538}
539
540// Serve accepts incoming connections on the listener lis, creating a new
541// ServerTransport and service goroutine for each. The service goroutines
542// read gRPC requests and then call the registered handlers to reply to them.
543// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
544// this method returns.
545// Serve will return a non-nil error unless Stop or GracefulStop is called.
546func (s *Server) Serve(lis net.Listener) error {
547	s.mu.Lock()
548	s.printf("serving")
549	s.serve = true
550	if s.lis == nil {
551		// Serve called after Stop or GracefulStop.
552		s.mu.Unlock()
553		lis.Close()
554		return ErrServerStopped
555	}
556
557	s.serveWG.Add(1)
558	defer func() {
559		s.serveWG.Done()
560		if s.quit.HasFired() {
561			// Stop or GracefulStop called; block until done and return nil.
562			<-s.done.Done()
563		}
564	}()
565
566	ls := &listenSocket{Listener: lis}
567	s.lis[ls] = true
568
569	if channelz.IsOn() {
570		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
571	}
572	s.mu.Unlock()
573
574	defer func() {
575		s.mu.Lock()
576		if s.lis != nil && s.lis[ls] {
577			ls.Close()
578			delete(s.lis, ls)
579		}
580		s.mu.Unlock()
581	}()
582
583	var tempDelay time.Duration // how long to sleep on accept failure
584
585	for {
586		rawConn, err := lis.Accept()
587		if err != nil {
588			if ne, ok := err.(interface {
589				Temporary() bool
590			}); ok && ne.Temporary() {
591				if tempDelay == 0 {
592					tempDelay = 5 * time.Millisecond
593				} else {
594					tempDelay *= 2
595				}
596				if max := 1 * time.Second; tempDelay > max {
597					tempDelay = max
598				}
599				s.mu.Lock()
600				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
601				s.mu.Unlock()
602				timer := time.NewTimer(tempDelay)
603				select {
604				case <-timer.C:
605				case <-s.quit.Done():
606					timer.Stop()
607					return nil
608				}
609				continue
610			}
611			s.mu.Lock()
612			s.printf("done serving; Accept = %v", err)
613			s.mu.Unlock()
614
615			if s.quit.HasFired() {
616				return nil
617			}
618			return err
619		}
620		tempDelay = 0
621		// Start a new goroutine to deal with rawConn so we don't stall this Accept
622		// loop goroutine.
623		//
624		// Make sure we account for the goroutine so GracefulStop doesn't nil out
625		// s.conns before this conn can be added.
626		s.serveWG.Add(1)
627		go func() {
628			s.handleRawConn(rawConn)
629			s.serveWG.Done()
630		}()
631	}
632}
633
634// handleRawConn forks a goroutine to handle a just-accepted connection that
635// has not had any I/O performed on it yet.
636func (s *Server) handleRawConn(rawConn net.Conn) {
637	if s.quit.HasFired() {
638		rawConn.Close()
639		return
640	}
641	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
642	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
643	if err != nil {
644		// ErrConnDispatched means that the connection was dispatched away from
645		// gRPC; those connections should be left open.
646		if err != credentials.ErrConnDispatched {
647			s.mu.Lock()
648			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
649			s.mu.Unlock()
650			grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
651			rawConn.Close()
652		}
653		rawConn.SetDeadline(time.Time{})
654		return
655	}
656
657	// Finish handshaking (HTTP2)
658	st := s.newHTTP2Transport(conn, authInfo)
659	if st == nil {
660		return
661	}
662
663	rawConn.SetDeadline(time.Time{})
664	if !s.addConn(st) {
665		return
666	}
667	go func() {
668		s.serveStreams(st)
669		s.removeConn(st)
670	}()
671}
672
673// newHTTP2Transport sets up a http/2 transport (using the
674// gRPC http2 server transport in transport/http2_server.go).
675func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
676	config := &transport.ServerConfig{
677		MaxStreams:            s.opts.maxConcurrentStreams,
678		AuthInfo:              authInfo,
679		InTapHandle:           s.opts.inTapHandle,
680		StatsHandler:          s.opts.statsHandler,
681		KeepaliveParams:       s.opts.keepaliveParams,
682		KeepalivePolicy:       s.opts.keepalivePolicy,
683		InitialWindowSize:     s.opts.initialWindowSize,
684		InitialConnWindowSize: s.opts.initialConnWindowSize,
685		WriteBufferSize:       s.opts.writeBufferSize,
686		ReadBufferSize:        s.opts.readBufferSize,
687		ChannelzParentID:      s.channelzID,
688		MaxHeaderListSize:     s.opts.maxHeaderListSize,
689	}
690	st, err := transport.NewServerTransport("http2", c, config)
691	if err != nil {
692		s.mu.Lock()
693		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
694		s.mu.Unlock()
695		c.Close()
696		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
697		return nil
698	}
699
700	return st
701}
702
703func (s *Server) serveStreams(st transport.ServerTransport) {
704	defer st.Close()
705	var wg sync.WaitGroup
706	st.HandleStreams(func(stream *transport.Stream) {
707		wg.Add(1)
708		go func() {
709			defer wg.Done()
710			s.handleStream(st, stream, s.traceInfo(st, stream))
711		}()
712	}, func(ctx context.Context, method string) context.Context {
713		if !EnableTracing {
714			return ctx
715		}
716		tr := trace.New("grpc.Recv."+methodFamily(method), method)
717		return trace.NewContext(ctx, tr)
718	})
719	wg.Wait()
720}
721
722var _ http.Handler = (*Server)(nil)
723
724// ServeHTTP implements the Go standard library's http.Handler
725// interface by responding to the gRPC request r, by looking up
726// the requested gRPC method in the gRPC server s.
727//
728// The provided HTTP request must have arrived on an HTTP/2
729// connection. When using the Go standard library's server,
730// practically this means that the Request must also have arrived
731// over TLS.
732//
733// To share one port (such as 443 for https) between gRPC and an
734// existing http.Handler, use a root http.Handler such as:
735//
736//   if r.ProtoMajor == 2 && strings.HasPrefix(
737//   	r.Header.Get("Content-Type"), "application/grpc") {
738//   	grpcServer.ServeHTTP(w, r)
739//   } else {
740//   	yourMux.ServeHTTP(w, r)
741//   }
742//
743// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
744// separate from grpc-go's HTTP/2 server. Performance and features may vary
745// between the two paths. ServeHTTP does not support some gRPC features
746// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
747// and subject to change.
748func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
749	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
750	if err != nil {
751		http.Error(w, err.Error(), http.StatusInternalServerError)
752		return
753	}
754	if !s.addConn(st) {
755		return
756	}
757	defer s.removeConn(st)
758	s.serveStreams(st)
759}
760
761// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
762// If tracing is not enabled, it returns nil.
763func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
764	if !EnableTracing {
765		return nil
766	}
767	tr, ok := trace.FromContext(stream.Context())
768	if !ok {
769		return nil
770	}
771
772	trInfo = &traceInfo{
773		tr: tr,
774		firstLine: firstLine{
775			client:     false,
776			remoteAddr: st.RemoteAddr(),
777		},
778	}
779	if dl, ok := stream.Context().Deadline(); ok {
780		trInfo.firstLine.deadline = time.Until(dl)
781	}
782	return trInfo
783}
784
785func (s *Server) addConn(st transport.ServerTransport) bool {
786	s.mu.Lock()
787	defer s.mu.Unlock()
788	if s.conns == nil {
789		st.Close()
790		return false
791	}
792	if s.drain {
793		// Transport added after we drained our existing conns: drain it
794		// immediately.
795		st.Drain()
796	}
797	s.conns[st] = true
798	return true
799}
800
801func (s *Server) removeConn(st transport.ServerTransport) {
802	s.mu.Lock()
803	defer s.mu.Unlock()
804	if s.conns != nil {
805		delete(s.conns, st)
806		s.cv.Broadcast()
807	}
808}
809
810func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
811	return &channelz.ServerInternalMetric{
812		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
813		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
814		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
815		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
816	}
817}
818
819func (s *Server) incrCallsStarted() {
820	atomic.AddInt64(&s.czData.callsStarted, 1)
821	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
822}
823
824func (s *Server) incrCallsSucceeded() {
825	atomic.AddInt64(&s.czData.callsSucceeded, 1)
826}
827
828func (s *Server) incrCallsFailed() {
829	atomic.AddInt64(&s.czData.callsFailed, 1)
830}
831
832func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
833	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
834	if err != nil {
835		grpclog.Errorln("grpc: server failed to encode response: ", err)
836		return err
837	}
838	compData, err := compress(data, cp, comp)
839	if err != nil {
840		grpclog.Errorln("grpc: server failed to compress response: ", err)
841		return err
842	}
843	hdr, payload := msgHeader(data, compData)
844	// TODO(dfawley): should we be checking len(data) instead?
845	if len(payload) > s.opts.maxSendMessageSize {
846		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
847	}
848	err = t.Write(stream, hdr, payload, opts)
849	if err == nil && s.opts.statsHandler != nil {
850		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
851	}
852	return err
853}
854
855func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
856	if channelz.IsOn() {
857		s.incrCallsStarted()
858		defer func() {
859			if err != nil && err != io.EOF {
860				s.incrCallsFailed()
861			} else {
862				s.incrCallsSucceeded()
863			}
864		}()
865	}
866	sh := s.opts.statsHandler
867	if sh != nil {
868		beginTime := time.Now()
869		begin := &stats.Begin{
870			BeginTime: beginTime,
871		}
872		sh.HandleRPC(stream.Context(), begin)
873		defer func() {
874			end := &stats.End{
875				BeginTime: beginTime,
876				EndTime:   time.Now(),
877			}
878			if err != nil && err != io.EOF {
879				end.Error = toRPCErr(err)
880			}
881			sh.HandleRPC(stream.Context(), end)
882		}()
883	}
884	if trInfo != nil {
885		defer trInfo.tr.Finish()
886		trInfo.tr.LazyLog(&trInfo.firstLine, false)
887		defer func() {
888			if err != nil && err != io.EOF {
889				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
890				trInfo.tr.SetError()
891			}
892		}()
893	}
894
895	binlog := binarylog.GetMethodLogger(stream.Method())
896	if binlog != nil {
897		ctx := stream.Context()
898		md, _ := metadata.FromIncomingContext(ctx)
899		logEntry := &binarylog.ClientHeader{
900			Header:     md,
901			MethodName: stream.Method(),
902			PeerAddr:   nil,
903		}
904		if deadline, ok := ctx.Deadline(); ok {
905			logEntry.Timeout = time.Until(deadline)
906			if logEntry.Timeout < 0 {
907				logEntry.Timeout = 0
908			}
909		}
910		if a := md[":authority"]; len(a) > 0 {
911			logEntry.Authority = a[0]
912		}
913		if peer, ok := peer.FromContext(ctx); ok {
914			logEntry.PeerAddr = peer.Addr
915		}
916		binlog.Log(logEntry)
917	}
918
919	// comp and cp are used for compression.  decomp and dc are used for
920	// decompression.  If comp and decomp are both set, they are the same;
921	// however they are kept separate to ensure that at most one of the
922	// compressor/decompressor variable pairs are set for use later.
923	var comp, decomp encoding.Compressor
924	var cp Compressor
925	var dc Decompressor
926
927	// If dc is set and matches the stream's compression, use it.  Otherwise, try
928	// to find a matching registered compressor for decomp.
929	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
930		dc = s.opts.dc
931	} else if rc != "" && rc != encoding.Identity {
932		decomp = encoding.GetCompressor(rc)
933		if decomp == nil {
934			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
935			t.WriteStatus(stream, st)
936			return st.Err()
937		}
938	}
939
940	// If cp is set, use it.  Otherwise, attempt to compress the response using
941	// the incoming message compression method.
942	//
943	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
944	if s.opts.cp != nil {
945		cp = s.opts.cp
946		stream.SetSendCompress(cp.Type())
947	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
948		// Legacy compressor not specified; attempt to respond with same encoding.
949		comp = encoding.GetCompressor(rc)
950		if comp != nil {
951			stream.SetSendCompress(rc)
952		}
953	}
954
955	var payInfo *payloadInfo
956	if sh != nil || binlog != nil {
957		payInfo = &payloadInfo{}
958	}
959	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
960	if err != nil {
961		if st, ok := status.FromError(err); ok {
962			if e := t.WriteStatus(stream, st); e != nil {
963				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
964			}
965		}
966		return err
967	}
968	if channelz.IsOn() {
969		t.IncrMsgRecv()
970	}
971	df := func(v interface{}) error {
972		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
973			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
974		}
975		if sh != nil {
976			sh.HandleRPC(stream.Context(), &stats.InPayload{
977				RecvTime:   time.Now(),
978				Payload:    v,
979				WireLength: payInfo.wireLength,
980				Data:       d,
981				Length:     len(d),
982			})
983		}
984		if binlog != nil {
985			binlog.Log(&binarylog.ClientMessage{
986				Message: d,
987			})
988		}
989		if trInfo != nil {
990			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
991		}
992		return nil
993	}
994	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
995	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
996	if appErr != nil {
997		appStatus, ok := status.FromError(appErr)
998		if !ok {
999			// Convert appErr if it is not a grpc status error.
1000			appErr = status.Error(codes.Unknown, appErr.Error())
1001			appStatus, _ = status.FromError(appErr)
1002		}
1003		if trInfo != nil {
1004			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1005			trInfo.tr.SetError()
1006		}
1007		if e := t.WriteStatus(stream, appStatus); e != nil {
1008			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1009		}
1010		if binlog != nil {
1011			if h, _ := stream.Header(); h.Len() > 0 {
1012				// Only log serverHeader if there was header. Otherwise it can
1013				// be trailer only.
1014				binlog.Log(&binarylog.ServerHeader{
1015					Header: h,
1016				})
1017			}
1018			binlog.Log(&binarylog.ServerTrailer{
1019				Trailer: stream.Trailer(),
1020				Err:     appErr,
1021			})
1022		}
1023		return appErr
1024	}
1025	if trInfo != nil {
1026		trInfo.tr.LazyLog(stringer("OK"), false)
1027	}
1028	opts := &transport.Options{Last: true}
1029
1030	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1031		if err == io.EOF {
1032			// The entire stream is done (for unary RPC only).
1033			return err
1034		}
1035		if s, ok := status.FromError(err); ok {
1036			if e := t.WriteStatus(stream, s); e != nil {
1037				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1038			}
1039		} else {
1040			switch st := err.(type) {
1041			case transport.ConnectionError:
1042				// Nothing to do here.
1043			default:
1044				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1045			}
1046		}
1047		if binlog != nil {
1048			h, _ := stream.Header()
1049			binlog.Log(&binarylog.ServerHeader{
1050				Header: h,
1051			})
1052			binlog.Log(&binarylog.ServerTrailer{
1053				Trailer: stream.Trailer(),
1054				Err:     appErr,
1055			})
1056		}
1057		return err
1058	}
1059	if binlog != nil {
1060		h, _ := stream.Header()
1061		binlog.Log(&binarylog.ServerHeader{
1062			Header: h,
1063		})
1064		binlog.Log(&binarylog.ServerMessage{
1065			Message: reply,
1066		})
1067	}
1068	if channelz.IsOn() {
1069		t.IncrMsgSent()
1070	}
1071	if trInfo != nil {
1072		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1073	}
1074	// TODO: Should we be logging if writing status failed here, like above?
1075	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1076	// error or allow the stats handler to see it?
1077	err = t.WriteStatus(stream, statusOK)
1078	if binlog != nil {
1079		binlog.Log(&binarylog.ServerTrailer{
1080			Trailer: stream.Trailer(),
1081			Err:     appErr,
1082		})
1083	}
1084	return err
1085}
1086
1087func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1088	if channelz.IsOn() {
1089		s.incrCallsStarted()
1090		defer func() {
1091			if err != nil && err != io.EOF {
1092				s.incrCallsFailed()
1093			} else {
1094				s.incrCallsSucceeded()
1095			}
1096		}()
1097	}
1098	sh := s.opts.statsHandler
1099	if sh != nil {
1100		beginTime := time.Now()
1101		begin := &stats.Begin{
1102			BeginTime: beginTime,
1103		}
1104		sh.HandleRPC(stream.Context(), begin)
1105		defer func() {
1106			end := &stats.End{
1107				BeginTime: beginTime,
1108				EndTime:   time.Now(),
1109			}
1110			if err != nil && err != io.EOF {
1111				end.Error = toRPCErr(err)
1112			}
1113			sh.HandleRPC(stream.Context(), end)
1114		}()
1115	}
1116	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1117	ss := &serverStream{
1118		ctx:                   ctx,
1119		t:                     t,
1120		s:                     stream,
1121		p:                     &parser{r: stream},
1122		codec:                 s.getCodec(stream.ContentSubtype()),
1123		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1124		maxSendMessageSize:    s.opts.maxSendMessageSize,
1125		trInfo:                trInfo,
1126		statsHandler:          sh,
1127	}
1128
1129	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1130	if ss.binlog != nil {
1131		md, _ := metadata.FromIncomingContext(ctx)
1132		logEntry := &binarylog.ClientHeader{
1133			Header:     md,
1134			MethodName: stream.Method(),
1135			PeerAddr:   nil,
1136		}
1137		if deadline, ok := ctx.Deadline(); ok {
1138			logEntry.Timeout = time.Until(deadline)
1139			if logEntry.Timeout < 0 {
1140				logEntry.Timeout = 0
1141			}
1142		}
1143		if a := md[":authority"]; len(a) > 0 {
1144			logEntry.Authority = a[0]
1145		}
1146		if peer, ok := peer.FromContext(ss.Context()); ok {
1147			logEntry.PeerAddr = peer.Addr
1148		}
1149		ss.binlog.Log(logEntry)
1150	}
1151
1152	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1153	// to find a matching registered compressor for decomp.
1154	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1155		ss.dc = s.opts.dc
1156	} else if rc != "" && rc != encoding.Identity {
1157		ss.decomp = encoding.GetCompressor(rc)
1158		if ss.decomp == nil {
1159			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1160			t.WriteStatus(ss.s, st)
1161			return st.Err()
1162		}
1163	}
1164
1165	// If cp is set, use it.  Otherwise, attempt to compress the response using
1166	// the incoming message compression method.
1167	//
1168	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1169	if s.opts.cp != nil {
1170		ss.cp = s.opts.cp
1171		stream.SetSendCompress(s.opts.cp.Type())
1172	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1173		// Legacy compressor not specified; attempt to respond with same encoding.
1174		ss.comp = encoding.GetCompressor(rc)
1175		if ss.comp != nil {
1176			stream.SetSendCompress(rc)
1177		}
1178	}
1179
1180	if trInfo != nil {
1181		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1182		defer func() {
1183			ss.mu.Lock()
1184			if err != nil && err != io.EOF {
1185				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1186				ss.trInfo.tr.SetError()
1187			}
1188			ss.trInfo.tr.Finish()
1189			ss.trInfo.tr = nil
1190			ss.mu.Unlock()
1191		}()
1192	}
1193	var appErr error
1194	var server interface{}
1195	if srv != nil {
1196		server = srv.server
1197	}
1198	if s.opts.streamInt == nil {
1199		appErr = sd.Handler(server, ss)
1200	} else {
1201		info := &StreamServerInfo{
1202			FullMethod:     stream.Method(),
1203			IsClientStream: sd.ClientStreams,
1204			IsServerStream: sd.ServerStreams,
1205		}
1206		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1207	}
1208	if appErr != nil {
1209		appStatus, ok := status.FromError(appErr)
1210		if !ok {
1211			appStatus = status.New(codes.Unknown, appErr.Error())
1212			appErr = appStatus.Err()
1213		}
1214		if trInfo != nil {
1215			ss.mu.Lock()
1216			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1217			ss.trInfo.tr.SetError()
1218			ss.mu.Unlock()
1219		}
1220		t.WriteStatus(ss.s, appStatus)
1221		if ss.binlog != nil {
1222			ss.binlog.Log(&binarylog.ServerTrailer{
1223				Trailer: ss.s.Trailer(),
1224				Err:     appErr,
1225			})
1226		}
1227		// TODO: Should we log an error from WriteStatus here and below?
1228		return appErr
1229	}
1230	if trInfo != nil {
1231		ss.mu.Lock()
1232		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1233		ss.mu.Unlock()
1234	}
1235	err = t.WriteStatus(ss.s, statusOK)
1236	if ss.binlog != nil {
1237		ss.binlog.Log(&binarylog.ServerTrailer{
1238			Trailer: ss.s.Trailer(),
1239			Err:     appErr,
1240		})
1241	}
1242	return err
1243}
1244
1245func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1246	sm := stream.Method()
1247	if sm != "" && sm[0] == '/' {
1248		sm = sm[1:]
1249	}
1250	pos := strings.LastIndex(sm, "/")
1251	if pos == -1 {
1252		if trInfo != nil {
1253			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1254			trInfo.tr.SetError()
1255		}
1256		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1257		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1258			if trInfo != nil {
1259				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1260				trInfo.tr.SetError()
1261			}
1262			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1263		}
1264		if trInfo != nil {
1265			trInfo.tr.Finish()
1266		}
1267		return
1268	}
1269	service := sm[:pos]
1270	method := sm[pos+1:]
1271
1272	srv, knownService := s.m[service]
1273	if knownService {
1274		if md, ok := srv.md[method]; ok {
1275			s.processUnaryRPC(t, stream, srv, md, trInfo)
1276			return
1277		}
1278		if sd, ok := srv.sd[method]; ok {
1279			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1280			return
1281		}
1282	}
1283	// Unknown service, or known server unknown method.
1284	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1285		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1286		return
1287	}
1288	var errDesc string
1289	if !knownService {
1290		errDesc = fmt.Sprintf("unknown service %v", service)
1291	} else {
1292		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1293	}
1294	if trInfo != nil {
1295		trInfo.tr.LazyPrintf("%s", errDesc)
1296		trInfo.tr.SetError()
1297	}
1298	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1299		if trInfo != nil {
1300			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1301			trInfo.tr.SetError()
1302		}
1303		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1304	}
1305	if trInfo != nil {
1306		trInfo.tr.Finish()
1307	}
1308}
1309
1310// The key to save ServerTransportStream in the context.
1311type streamKey struct{}
1312
1313// NewContextWithServerTransportStream creates a new context from ctx and
1314// attaches stream to it.
1315//
1316// This API is EXPERIMENTAL.
1317func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1318	return context.WithValue(ctx, streamKey{}, stream)
1319}
1320
1321// ServerTransportStream is a minimal interface that a transport stream must
1322// implement. This can be used to mock an actual transport stream for tests of
1323// handler code that use, for example, grpc.SetHeader (which requires some
1324// stream to be in context).
1325//
1326// See also NewContextWithServerTransportStream.
1327//
1328// This API is EXPERIMENTAL.
1329type ServerTransportStream interface {
1330	Method() string
1331	SetHeader(md metadata.MD) error
1332	SendHeader(md metadata.MD) error
1333	SetTrailer(md metadata.MD) error
1334}
1335
1336// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1337// ctx. Returns nil if the given context has no stream associated with it
1338// (which implies it is not an RPC invocation context).
1339//
1340// This API is EXPERIMENTAL.
1341func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1342	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1343	return s
1344}
1345
1346// Stop stops the gRPC server. It immediately closes all open
1347// connections and listeners.
1348// It cancels all active RPCs on the server side and the corresponding
1349// pending RPCs on the client side will get notified by connection
1350// errors.
1351func (s *Server) Stop() {
1352	s.quit.Fire()
1353
1354	defer func() {
1355		s.serveWG.Wait()
1356		s.done.Fire()
1357	}()
1358
1359	s.channelzRemoveOnce.Do(func() {
1360		if channelz.IsOn() {
1361			channelz.RemoveEntry(s.channelzID)
1362		}
1363	})
1364
1365	s.mu.Lock()
1366	listeners := s.lis
1367	s.lis = nil
1368	st := s.conns
1369	s.conns = nil
1370	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1371	s.cv.Broadcast()
1372	s.mu.Unlock()
1373
1374	for lis := range listeners {
1375		lis.Close()
1376	}
1377	for c := range st {
1378		c.Close()
1379	}
1380
1381	s.mu.Lock()
1382	if s.events != nil {
1383		s.events.Finish()
1384		s.events = nil
1385	}
1386	s.mu.Unlock()
1387}
1388
1389// GracefulStop stops the gRPC server gracefully. It stops the server from
1390// accepting new connections and RPCs and blocks until all the pending RPCs are
1391// finished.
1392func (s *Server) GracefulStop() {
1393	s.quit.Fire()
1394	defer s.done.Fire()
1395
1396	s.channelzRemoveOnce.Do(func() {
1397		if channelz.IsOn() {
1398			channelz.RemoveEntry(s.channelzID)
1399		}
1400	})
1401	s.mu.Lock()
1402	if s.conns == nil {
1403		s.mu.Unlock()
1404		return
1405	}
1406
1407	for lis := range s.lis {
1408		lis.Close()
1409	}
1410	s.lis = nil
1411	if !s.drain {
1412		for st := range s.conns {
1413			st.Drain()
1414		}
1415		s.drain = true
1416	}
1417
1418	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1419	// new conns will be created.
1420	s.mu.Unlock()
1421	s.serveWG.Wait()
1422	s.mu.Lock()
1423
1424	for len(s.conns) != 0 {
1425		s.cv.Wait()
1426	}
1427	s.conns = nil
1428	if s.events != nil {
1429		s.events.Finish()
1430		s.events = nil
1431	}
1432	s.mu.Unlock()
1433}
1434
1435// contentSubtype must be lowercase
1436// cannot return nil
1437func (s *Server) getCodec(contentSubtype string) baseCodec {
1438	if s.opts.codec != nil {
1439		return s.opts.codec
1440	}
1441	if contentSubtype == "" {
1442		return encoding.GetCodec(proto.Name)
1443	}
1444	codec := encoding.GetCodec(contentSubtype)
1445	if codec == nil {
1446		return encoding.GetCodec(proto.Name)
1447	}
1448	return codec
1449}
1450
1451// SetHeader sets the header metadata.
1452// When called multiple times, all the provided metadata will be merged.
1453// All the metadata will be sent out when one of the following happens:
1454//  - grpc.SendHeader() is called;
1455//  - The first response is sent out;
1456//  - An RPC status is sent out (error or success).
1457func SetHeader(ctx context.Context, md metadata.MD) error {
1458	if md.Len() == 0 {
1459		return nil
1460	}
1461	stream := ServerTransportStreamFromContext(ctx)
1462	if stream == nil {
1463		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1464	}
1465	return stream.SetHeader(md)
1466}
1467
1468// SendHeader sends header metadata. It may be called at most once.
1469// The provided md and headers set by SetHeader() will be sent.
1470func SendHeader(ctx context.Context, md metadata.MD) error {
1471	stream := ServerTransportStreamFromContext(ctx)
1472	if stream == nil {
1473		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1474	}
1475	if err := stream.SendHeader(md); err != nil {
1476		return toRPCErr(err)
1477	}
1478	return nil
1479}
1480
1481// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1482// When called more than once, all the provided metadata will be merged.
1483func SetTrailer(ctx context.Context, md metadata.MD) error {
1484	if md.Len() == 0 {
1485		return nil
1486	}
1487	stream := ServerTransportStreamFromContext(ctx)
1488	if stream == nil {
1489		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1490	}
1491	return stream.SetTrailer(md)
1492}
1493
1494// Method returns the method string for the server context.  The returned
1495// string is in the format of "/service/method".
1496func Method(ctx context.Context) (string, bool) {
1497	s := ServerTransportStreamFromContext(ctx)
1498	if s == nil {
1499		return "", false
1500	}
1501	return s.Method(), true
1502}
1503
1504type channelzServer struct {
1505	s *Server
1506}
1507
1508func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1509	return c.s.channelzMetric()
1510}
1511