1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"sync/atomic"
34	"time"
35
36	"golang.org/x/net/trace"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/encoding"
41	"google.golang.org/grpc/encoding/proto"
42	"google.golang.org/grpc/grpclog"
43	"google.golang.org/grpc/internal/binarylog"
44	"google.golang.org/grpc/internal/channelz"
45	"google.golang.org/grpc/internal/transport"
46	"google.golang.org/grpc/keepalive"
47	"google.golang.org/grpc/metadata"
48	"google.golang.org/grpc/peer"
49	"google.golang.org/grpc/stats"
50	"google.golang.org/grpc/status"
51	"google.golang.org/grpc/tap"
52)
53
54const (
55	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56	defaultServerMaxSendMessageSize    = math.MaxInt32
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63	MethodName string
64	Handler    methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69	ServiceName string
70	// The pointer to the service interface. Used to check whether the user
71	// provided implementation satisfies the interface requirements.
72	HandlerType interface{}
73	Methods     []MethodDesc
74	Streams     []StreamDesc
75	Metadata    interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81	server interface{} // the server for service methods
82	md     map[string]*MethodDesc
83	sd     map[string]*StreamDesc
84	mdata  interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89	opts serverOptions
90
91	mu     sync.Mutex // guards following
92	lis    map[net.Listener]bool
93	conns  map[transport.ServerTransport]bool
94	serve  bool
95	drain  bool
96	cv     *sync.Cond          // signaled when connections close for GracefulStop
97	m      map[string]*service // service name -> service info
98	events trace.EventLog
99
100	quit               chan struct{}
101	done               chan struct{}
102	quitOnce           sync.Once
103	doneOnce           sync.Once
104	channelzRemoveOnce sync.Once
105	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107	channelzID int64 // channelz unique identification number
108	czData     *channelzData
109}
110
111type serverOptions struct {
112	creds                 credentials.TransportCredentials
113	codec                 baseCodec
114	cp                    Compressor
115	dc                    Decompressor
116	unaryInt              UnaryServerInterceptor
117	streamInt             StreamServerInterceptor
118	inTapHandle           tap.ServerInHandle
119	statsHandler          stats.Handler
120	maxConcurrentStreams  uint32
121	maxReceiveMessageSize int
122	maxSendMessageSize    int
123	unknownStreamDesc     *StreamDesc
124	keepaliveParams       keepalive.ServerParameters
125	keepalivePolicy       keepalive.EnforcementPolicy
126	initialWindowSize     int32
127	initialConnWindowSize int32
128	writeBufferSize       int
129	readBufferSize        int
130	connectionTimeout     time.Duration
131	maxHeaderListSize     *uint32
132}
133
134var defaultServerOptions = serverOptions{
135	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136	maxSendMessageSize:    defaultServerMaxSendMessageSize,
137	connectionTimeout:     120 * time.Second,
138	writeBufferSize:       defaultWriteBufSize,
139	readBufferSize:        defaultReadBufSize,
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption interface {
144	apply(*serverOptions)
145}
146
147// EmptyServerOption does not alter the server configuration. It can be embedded
148// in another structure to build custom server options.
149//
150// This API is EXPERIMENTAL.
151type EmptyServerOption struct{}
152
153func (EmptyServerOption) apply(*serverOptions) {}
154
155// funcServerOption wraps a function that modifies serverOptions into an
156// implementation of the ServerOption interface.
157type funcServerOption struct {
158	f func(*serverOptions)
159}
160
161func (fdo *funcServerOption) apply(do *serverOptions) {
162	fdo.f(do)
163}
164
165func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
166	return &funcServerOption{
167		f: f,
168	}
169}
170
171// WriteBufferSize determines how much data can be batched before doing a write on the wire.
172// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
173// The default value for this buffer is 32KB.
174// Zero will disable the write buffer such that each write will be on underlying connection.
175// Note: A Send call may not directly translate to a write.
176func WriteBufferSize(s int) ServerOption {
177	return newFuncServerOption(func(o *serverOptions) {
178		o.writeBufferSize = s
179	})
180}
181
182// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
183// for one read syscall.
184// The default value for this buffer is 32KB.
185// Zero will disable read buffer for a connection so data framer can access the underlying
186// conn directly.
187func ReadBufferSize(s int) ServerOption {
188	return newFuncServerOption(func(o *serverOptions) {
189		o.readBufferSize = s
190	})
191}
192
193// InitialWindowSize returns a ServerOption that sets window size for stream.
194// The lower bound for window size is 64K and any value smaller than that will be ignored.
195func InitialWindowSize(s int32) ServerOption {
196	return newFuncServerOption(func(o *serverOptions) {
197		o.initialWindowSize = s
198	})
199}
200
201// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
202// The lower bound for window size is 64K and any value smaller than that will be ignored.
203func InitialConnWindowSize(s int32) ServerOption {
204	return newFuncServerOption(func(o *serverOptions) {
205		o.initialConnWindowSize = s
206	})
207}
208
209// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
210func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
211	if kp.Time > 0 && kp.Time < time.Second {
212		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
213		kp.Time = time.Second
214	}
215
216	return newFuncServerOption(func(o *serverOptions) {
217		o.keepaliveParams = kp
218	})
219}
220
221// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
222func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
223	return newFuncServerOption(func(o *serverOptions) {
224		o.keepalivePolicy = kep
225	})
226}
227
228// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
229//
230// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
231func CustomCodec(codec Codec) ServerOption {
232	return newFuncServerOption(func(o *serverOptions) {
233		o.codec = codec
234	})
235}
236
237// RPCCompressor returns a ServerOption that sets a compressor for outbound
238// messages.  For backward compatibility, all outbound messages will be sent
239// using this compressor, regardless of incoming message compression.  By
240// default, server messages will be sent using the same compressor with which
241// request messages were sent.
242//
243// Deprecated: use encoding.RegisterCompressor instead.
244func RPCCompressor(cp Compressor) ServerOption {
245	return newFuncServerOption(func(o *serverOptions) {
246		o.cp = cp
247	})
248}
249
250// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
251// messages.  It has higher priority than decompressors registered via
252// encoding.RegisterCompressor.
253//
254// Deprecated: use encoding.RegisterCompressor instead.
255func RPCDecompressor(dc Decompressor) ServerOption {
256	return newFuncServerOption(func(o *serverOptions) {
257		o.dc = dc
258	})
259}
260
261// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
262// If this is not set, gRPC uses the default limit.
263//
264// Deprecated: use MaxRecvMsgSize instead.
265func MaxMsgSize(m int) ServerOption {
266	return MaxRecvMsgSize(m)
267}
268
269// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
270// If this is not set, gRPC uses the default 4MB.
271func MaxRecvMsgSize(m int) ServerOption {
272	return newFuncServerOption(func(o *serverOptions) {
273		o.maxReceiveMessageSize = m
274	})
275}
276
277// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
278// If this is not set, gRPC uses the default `math.MaxInt32`.
279func MaxSendMsgSize(m int) ServerOption {
280	return newFuncServerOption(func(o *serverOptions) {
281		o.maxSendMessageSize = m
282	})
283}
284
285// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
286// of concurrent streams to each ServerTransport.
287func MaxConcurrentStreams(n uint32) ServerOption {
288	return newFuncServerOption(func(o *serverOptions) {
289		o.maxConcurrentStreams = n
290	})
291}
292
293// Creds returns a ServerOption that sets credentials for server connections.
294func Creds(c credentials.TransportCredentials) ServerOption {
295	return newFuncServerOption(func(o *serverOptions) {
296		o.creds = c
297	})
298}
299
300// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
301// server. Only one unary interceptor can be installed. The construction of multiple
302// interceptors (e.g., chaining) can be implemented at the caller.
303func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
304	return newFuncServerOption(func(o *serverOptions) {
305		if o.unaryInt != nil {
306			panic("The unary server interceptor was already set and may not be reset.")
307		}
308		o.unaryInt = i
309	})
310}
311
312// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
313// server. Only one stream interceptor can be installed.
314func StreamInterceptor(i StreamServerInterceptor) ServerOption {
315	return newFuncServerOption(func(o *serverOptions) {
316		if o.streamInt != nil {
317			panic("The stream server interceptor was already set and may not be reset.")
318		}
319		o.streamInt = i
320	})
321}
322
323// InTapHandle returns a ServerOption that sets the tap handle for all the server
324// transport to be created. Only one can be installed.
325func InTapHandle(h tap.ServerInHandle) ServerOption {
326	return newFuncServerOption(func(o *serverOptions) {
327		if o.inTapHandle != nil {
328			panic("The tap handle was already set and may not be reset.")
329		}
330		o.inTapHandle = h
331	})
332}
333
334// StatsHandler returns a ServerOption that sets the stats handler for the server.
335func StatsHandler(h stats.Handler) ServerOption {
336	return newFuncServerOption(func(o *serverOptions) {
337		o.statsHandler = h
338	})
339}
340
341// UnknownServiceHandler returns a ServerOption that allows for adding a custom
342// unknown service handler. The provided method is a bidi-streaming RPC service
343// handler that will be invoked instead of returning the "unimplemented" gRPC
344// error whenever a request is received for an unregistered service or method.
345// The handling function has full access to the Context of the request and the
346// stream, and the invocation bypasses interceptors.
347func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
348	return newFuncServerOption(func(o *serverOptions) {
349		o.unknownStreamDesc = &StreamDesc{
350			StreamName: "unknown_service_handler",
351			Handler:    streamHandler,
352			// We need to assume that the users of the streamHandler will want to use both.
353			ClientStreams: true,
354			ServerStreams: true,
355		}
356	})
357}
358
359// ConnectionTimeout returns a ServerOption that sets the timeout for
360// connection establishment (up to and including HTTP/2 handshaking) for all
361// new connections.  If this is not set, the default is 120 seconds.  A zero or
362// negative value will result in an immediate timeout.
363//
364// This API is EXPERIMENTAL.
365func ConnectionTimeout(d time.Duration) ServerOption {
366	return newFuncServerOption(func(o *serverOptions) {
367		o.connectionTimeout = d
368	})
369}
370
371// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
372// of header list that the server is prepared to accept.
373func MaxHeaderListSize(s uint32) ServerOption {
374	return newFuncServerOption(func(o *serverOptions) {
375		o.maxHeaderListSize = &s
376	})
377}
378
379// NewServer creates a gRPC server which has no service registered and has not
380// started to accept requests yet.
381func NewServer(opt ...ServerOption) *Server {
382	opts := defaultServerOptions
383	for _, o := range opt {
384		o.apply(&opts)
385	}
386	s := &Server{
387		lis:    make(map[net.Listener]bool),
388		opts:   opts,
389		conns:  make(map[transport.ServerTransport]bool),
390		m:      make(map[string]*service),
391		quit:   make(chan struct{}),
392		done:   make(chan struct{}),
393		czData: new(channelzData),
394	}
395	s.cv = sync.NewCond(&s.mu)
396	if EnableTracing {
397		_, file, line, _ := runtime.Caller(1)
398		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
399	}
400
401	if channelz.IsOn() {
402		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
403	}
404	return s
405}
406
407// printf records an event in s's event log, unless s has been stopped.
408// REQUIRES s.mu is held.
409func (s *Server) printf(format string, a ...interface{}) {
410	if s.events != nil {
411		s.events.Printf(format, a...)
412	}
413}
414
415// errorf records an error in s's event log, unless s has been stopped.
416// REQUIRES s.mu is held.
417func (s *Server) errorf(format string, a ...interface{}) {
418	if s.events != nil {
419		s.events.Errorf(format, a...)
420	}
421}
422
423// RegisterService registers a service and its implementation to the gRPC
424// server. It is called from the IDL generated code. This must be called before
425// invoking Serve.
426func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
427	ht := reflect.TypeOf(sd.HandlerType).Elem()
428	st := reflect.TypeOf(ss)
429	if !st.Implements(ht) {
430		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
431	}
432	s.register(sd, ss)
433}
434
435func (s *Server) register(sd *ServiceDesc, ss interface{}) {
436	s.mu.Lock()
437	defer s.mu.Unlock()
438	s.printf("RegisterService(%q)", sd.ServiceName)
439	if s.serve {
440		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
441	}
442	if _, ok := s.m[sd.ServiceName]; ok {
443		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
444	}
445	srv := &service{
446		server: ss,
447		md:     make(map[string]*MethodDesc),
448		sd:     make(map[string]*StreamDesc),
449		mdata:  sd.Metadata,
450	}
451	for i := range sd.Methods {
452		d := &sd.Methods[i]
453		srv.md[d.MethodName] = d
454	}
455	for i := range sd.Streams {
456		d := &sd.Streams[i]
457		srv.sd[d.StreamName] = d
458	}
459	s.m[sd.ServiceName] = srv
460}
461
462// MethodInfo contains the information of an RPC including its method name and type.
463type MethodInfo struct {
464	// Name is the method name only, without the service name or package name.
465	Name string
466	// IsClientStream indicates whether the RPC is a client streaming RPC.
467	IsClientStream bool
468	// IsServerStream indicates whether the RPC is a server streaming RPC.
469	IsServerStream bool
470}
471
472// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
473type ServiceInfo struct {
474	Methods []MethodInfo
475	// Metadata is the metadata specified in ServiceDesc when registering service.
476	Metadata interface{}
477}
478
479// GetServiceInfo returns a map from service names to ServiceInfo.
480// Service names include the package names, in the form of <package>.<service>.
481func (s *Server) GetServiceInfo() map[string]ServiceInfo {
482	ret := make(map[string]ServiceInfo)
483	for n, srv := range s.m {
484		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
485		for m := range srv.md {
486			methods = append(methods, MethodInfo{
487				Name:           m,
488				IsClientStream: false,
489				IsServerStream: false,
490			})
491		}
492		for m, d := range srv.sd {
493			methods = append(methods, MethodInfo{
494				Name:           m,
495				IsClientStream: d.ClientStreams,
496				IsServerStream: d.ServerStreams,
497			})
498		}
499
500		ret[n] = ServiceInfo{
501			Methods:  methods,
502			Metadata: srv.mdata,
503		}
504	}
505	return ret
506}
507
508// ErrServerStopped indicates that the operation is now illegal because of
509// the server being stopped.
510var ErrServerStopped = errors.New("grpc: the server has been stopped")
511
512func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
513	if s.opts.creds == nil {
514		return rawConn, nil, nil
515	}
516	return s.opts.creds.ServerHandshake(rawConn)
517}
518
519type listenSocket struct {
520	net.Listener
521	channelzID int64
522}
523
524func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
525	return &channelz.SocketInternalMetric{
526		SocketOptions: channelz.GetSocketOption(l.Listener),
527		LocalAddr:     l.Listener.Addr(),
528	}
529}
530
531func (l *listenSocket) Close() error {
532	err := l.Listener.Close()
533	if channelz.IsOn() {
534		channelz.RemoveEntry(l.channelzID)
535	}
536	return err
537}
538
539// Serve accepts incoming connections on the listener lis, creating a new
540// ServerTransport and service goroutine for each. The service goroutines
541// read gRPC requests and then call the registered handlers to reply to them.
542// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
543// this method returns.
544// Serve will return a non-nil error unless Stop or GracefulStop is called.
545func (s *Server) Serve(lis net.Listener) error {
546	s.mu.Lock()
547	s.printf("serving")
548	s.serve = true
549	if s.lis == nil {
550		// Serve called after Stop or GracefulStop.
551		s.mu.Unlock()
552		lis.Close()
553		return ErrServerStopped
554	}
555
556	s.serveWG.Add(1)
557	defer func() {
558		s.serveWG.Done()
559		select {
560		// Stop or GracefulStop called; block until done and return nil.
561		case <-s.quit:
562			<-s.done
563		default:
564		}
565	}()
566
567	ls := &listenSocket{Listener: lis}
568	s.lis[ls] = true
569
570	if channelz.IsOn() {
571		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
572	}
573	s.mu.Unlock()
574
575	defer func() {
576		s.mu.Lock()
577		if s.lis != nil && s.lis[ls] {
578			ls.Close()
579			delete(s.lis, ls)
580		}
581		s.mu.Unlock()
582	}()
583
584	var tempDelay time.Duration // how long to sleep on accept failure
585
586	for {
587		rawConn, err := lis.Accept()
588		if err != nil {
589			if ne, ok := err.(interface {
590				Temporary() bool
591			}); ok && ne.Temporary() {
592				if tempDelay == 0 {
593					tempDelay = 5 * time.Millisecond
594				} else {
595					tempDelay *= 2
596				}
597				if max := 1 * time.Second; tempDelay > max {
598					tempDelay = max
599				}
600				s.mu.Lock()
601				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
602				s.mu.Unlock()
603				timer := time.NewTimer(tempDelay)
604				select {
605				case <-timer.C:
606				case <-s.quit:
607					timer.Stop()
608					return nil
609				}
610				continue
611			}
612			s.mu.Lock()
613			s.printf("done serving; Accept = %v", err)
614			s.mu.Unlock()
615
616			select {
617			case <-s.quit:
618				return nil
619			default:
620			}
621			return err
622		}
623		tempDelay = 0
624		// Start a new goroutine to deal with rawConn so we don't stall this Accept
625		// loop goroutine.
626		//
627		// Make sure we account for the goroutine so GracefulStop doesn't nil out
628		// s.conns before this conn can be added.
629		s.serveWG.Add(1)
630		go func() {
631			s.handleRawConn(rawConn)
632			s.serveWG.Done()
633		}()
634	}
635}
636
637// handleRawConn forks a goroutine to handle a just-accepted connection that
638// has not had any I/O performed on it yet.
639func (s *Server) handleRawConn(rawConn net.Conn) {
640	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
641	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
642	if err != nil {
643		// ErrConnDispatched means that the connection was dispatched away from
644		// gRPC; those connections should be left open.
645		if err != credentials.ErrConnDispatched {
646			s.mu.Lock()
647			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
648			s.mu.Unlock()
649			grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
650			rawConn.Close()
651		}
652		rawConn.SetDeadline(time.Time{})
653		return
654	}
655
656	s.mu.Lock()
657	if s.conns == nil {
658		s.mu.Unlock()
659		conn.Close()
660		return
661	}
662	s.mu.Unlock()
663
664	// Finish handshaking (HTTP2)
665	st := s.newHTTP2Transport(conn, authInfo)
666	if st == nil {
667		return
668	}
669
670	rawConn.SetDeadline(time.Time{})
671	if !s.addConn(st) {
672		return
673	}
674	go func() {
675		s.serveStreams(st)
676		s.removeConn(st)
677	}()
678}
679
680// newHTTP2Transport sets up a http/2 transport (using the
681// gRPC http2 server transport in transport/http2_server.go).
682func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
683	config := &transport.ServerConfig{
684		MaxStreams:            s.opts.maxConcurrentStreams,
685		AuthInfo:              authInfo,
686		InTapHandle:           s.opts.inTapHandle,
687		StatsHandler:          s.opts.statsHandler,
688		KeepaliveParams:       s.opts.keepaliveParams,
689		KeepalivePolicy:       s.opts.keepalivePolicy,
690		InitialWindowSize:     s.opts.initialWindowSize,
691		InitialConnWindowSize: s.opts.initialConnWindowSize,
692		WriteBufferSize:       s.opts.writeBufferSize,
693		ReadBufferSize:        s.opts.readBufferSize,
694		ChannelzParentID:      s.channelzID,
695		MaxHeaderListSize:     s.opts.maxHeaderListSize,
696	}
697	st, err := transport.NewServerTransport("http2", c, config)
698	if err != nil {
699		s.mu.Lock()
700		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
701		s.mu.Unlock()
702		c.Close()
703		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
704		return nil
705	}
706
707	return st
708}
709
710func (s *Server) serveStreams(st transport.ServerTransport) {
711	defer st.Close()
712	var wg sync.WaitGroup
713	st.HandleStreams(func(stream *transport.Stream) {
714		wg.Add(1)
715		go func() {
716			defer wg.Done()
717			s.handleStream(st, stream, s.traceInfo(st, stream))
718		}()
719	}, func(ctx context.Context, method string) context.Context {
720		if !EnableTracing {
721			return ctx
722		}
723		tr := trace.New("grpc.Recv."+methodFamily(method), method)
724		return trace.NewContext(ctx, tr)
725	})
726	wg.Wait()
727}
728
729var _ http.Handler = (*Server)(nil)
730
731// ServeHTTP implements the Go standard library's http.Handler
732// interface by responding to the gRPC request r, by looking up
733// the requested gRPC method in the gRPC server s.
734//
735// The provided HTTP request must have arrived on an HTTP/2
736// connection. When using the Go standard library's server,
737// practically this means that the Request must also have arrived
738// over TLS.
739//
740// To share one port (such as 443 for https) between gRPC and an
741// existing http.Handler, use a root http.Handler such as:
742//
743//   if r.ProtoMajor == 2 && strings.HasPrefix(
744//   	r.Header.Get("Content-Type"), "application/grpc") {
745//   	grpcServer.ServeHTTP(w, r)
746//   } else {
747//   	yourMux.ServeHTTP(w, r)
748//   }
749//
750// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
751// separate from grpc-go's HTTP/2 server. Performance and features may vary
752// between the two paths. ServeHTTP does not support some gRPC features
753// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
754// and subject to change.
755func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
756	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
757	if err != nil {
758		http.Error(w, err.Error(), http.StatusInternalServerError)
759		return
760	}
761	if !s.addConn(st) {
762		return
763	}
764	defer s.removeConn(st)
765	s.serveStreams(st)
766}
767
768// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
769// If tracing is not enabled, it returns nil.
770func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
771	tr, ok := trace.FromContext(stream.Context())
772	if !ok {
773		return nil
774	}
775
776	trInfo = &traceInfo{
777		tr: tr,
778		firstLine: firstLine{
779			client:     false,
780			remoteAddr: st.RemoteAddr(),
781		},
782	}
783	if dl, ok := stream.Context().Deadline(); ok {
784		trInfo.firstLine.deadline = time.Until(dl)
785	}
786	return trInfo
787}
788
789func (s *Server) addConn(st transport.ServerTransport) bool {
790	s.mu.Lock()
791	defer s.mu.Unlock()
792	if s.conns == nil {
793		st.Close()
794		return false
795	}
796	if s.drain {
797		// Transport added after we drained our existing conns: drain it
798		// immediately.
799		st.Drain()
800	}
801	s.conns[st] = true
802	return true
803}
804
805func (s *Server) removeConn(st transport.ServerTransport) {
806	s.mu.Lock()
807	defer s.mu.Unlock()
808	if s.conns != nil {
809		delete(s.conns, st)
810		s.cv.Broadcast()
811	}
812}
813
814func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
815	return &channelz.ServerInternalMetric{
816		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
817		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
818		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
819		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
820	}
821}
822
823func (s *Server) incrCallsStarted() {
824	atomic.AddInt64(&s.czData.callsStarted, 1)
825	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
826}
827
828func (s *Server) incrCallsSucceeded() {
829	atomic.AddInt64(&s.czData.callsSucceeded, 1)
830}
831
832func (s *Server) incrCallsFailed() {
833	atomic.AddInt64(&s.czData.callsFailed, 1)
834}
835
836func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
837	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
838	if err != nil {
839		grpclog.Errorln("grpc: server failed to encode response: ", err)
840		return err
841	}
842	compData, err := compress(data, cp, comp)
843	if err != nil {
844		grpclog.Errorln("grpc: server failed to compress response: ", err)
845		return err
846	}
847	hdr, payload := msgHeader(data, compData)
848	// TODO(dfawley): should we be checking len(data) instead?
849	if len(payload) > s.opts.maxSendMessageSize {
850		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
851	}
852	err = t.Write(stream, hdr, payload, opts)
853	if err == nil && s.opts.statsHandler != nil {
854		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
855	}
856	return err
857}
858
859func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
860	if channelz.IsOn() {
861		s.incrCallsStarted()
862		defer func() {
863			if err != nil && err != io.EOF {
864				s.incrCallsFailed()
865			} else {
866				s.incrCallsSucceeded()
867			}
868		}()
869	}
870	sh := s.opts.statsHandler
871	if sh != nil {
872		beginTime := time.Now()
873		begin := &stats.Begin{
874			BeginTime: beginTime,
875		}
876		sh.HandleRPC(stream.Context(), begin)
877		defer func() {
878			end := &stats.End{
879				BeginTime: beginTime,
880				EndTime:   time.Now(),
881			}
882			if err != nil && err != io.EOF {
883				end.Error = toRPCErr(err)
884			}
885			sh.HandleRPC(stream.Context(), end)
886		}()
887	}
888	if trInfo != nil {
889		defer trInfo.tr.Finish()
890		trInfo.tr.LazyLog(&trInfo.firstLine, false)
891		defer func() {
892			if err != nil && err != io.EOF {
893				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
894				trInfo.tr.SetError()
895			}
896		}()
897	}
898
899	binlog := binarylog.GetMethodLogger(stream.Method())
900	if binlog != nil {
901		ctx := stream.Context()
902		md, _ := metadata.FromIncomingContext(ctx)
903		logEntry := &binarylog.ClientHeader{
904			Header:     md,
905			MethodName: stream.Method(),
906			PeerAddr:   nil,
907		}
908		if deadline, ok := ctx.Deadline(); ok {
909			logEntry.Timeout = time.Until(deadline)
910			if logEntry.Timeout < 0 {
911				logEntry.Timeout = 0
912			}
913		}
914		if a := md[":authority"]; len(a) > 0 {
915			logEntry.Authority = a[0]
916		}
917		if peer, ok := peer.FromContext(ctx); ok {
918			logEntry.PeerAddr = peer.Addr
919		}
920		binlog.Log(logEntry)
921	}
922
923	// comp and cp are used for compression.  decomp and dc are used for
924	// decompression.  If comp and decomp are both set, they are the same;
925	// however they are kept separate to ensure that at most one of the
926	// compressor/decompressor variable pairs are set for use later.
927	var comp, decomp encoding.Compressor
928	var cp Compressor
929	var dc Decompressor
930
931	// If dc is set and matches the stream's compression, use it.  Otherwise, try
932	// to find a matching registered compressor for decomp.
933	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
934		dc = s.opts.dc
935	} else if rc != "" && rc != encoding.Identity {
936		decomp = encoding.GetCompressor(rc)
937		if decomp == nil {
938			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
939			t.WriteStatus(stream, st)
940			return st.Err()
941		}
942	}
943
944	// If cp is set, use it.  Otherwise, attempt to compress the response using
945	// the incoming message compression method.
946	//
947	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
948	if s.opts.cp != nil {
949		cp = s.opts.cp
950		stream.SetSendCompress(cp.Type())
951	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
952		// Legacy compressor not specified; attempt to respond with same encoding.
953		comp = encoding.GetCompressor(rc)
954		if comp != nil {
955			stream.SetSendCompress(rc)
956		}
957	}
958
959	var payInfo *payloadInfo
960	if sh != nil || binlog != nil {
961		payInfo = &payloadInfo{}
962	}
963	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
964	if err != nil {
965		if st, ok := status.FromError(err); ok {
966			if e := t.WriteStatus(stream, st); e != nil {
967				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
968			}
969		}
970		return err
971	}
972	if channelz.IsOn() {
973		t.IncrMsgRecv()
974	}
975	df := func(v interface{}) error {
976		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
977			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
978		}
979		if sh != nil {
980			sh.HandleRPC(stream.Context(), &stats.InPayload{
981				RecvTime: time.Now(),
982				Payload:  v,
983				Data:     d,
984				Length:   len(d),
985			})
986		}
987		if binlog != nil {
988			binlog.Log(&binarylog.ClientMessage{
989				Message: d,
990			})
991		}
992		if trInfo != nil {
993			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
994		}
995		return nil
996	}
997	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
998	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
999	if appErr != nil {
1000		appStatus, ok := status.FromError(appErr)
1001		if !ok {
1002			// Convert appErr if it is not a grpc status error.
1003			appErr = status.Error(codes.Unknown, appErr.Error())
1004			appStatus, _ = status.FromError(appErr)
1005		}
1006		if trInfo != nil {
1007			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1008			trInfo.tr.SetError()
1009		}
1010		if e := t.WriteStatus(stream, appStatus); e != nil {
1011			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1012		}
1013		if binlog != nil {
1014			if h, _ := stream.Header(); h.Len() > 0 {
1015				// Only log serverHeader if there was header. Otherwise it can
1016				// be trailer only.
1017				binlog.Log(&binarylog.ServerHeader{
1018					Header: h,
1019				})
1020			}
1021			binlog.Log(&binarylog.ServerTrailer{
1022				Trailer: stream.Trailer(),
1023				Err:     appErr,
1024			})
1025		}
1026		return appErr
1027	}
1028	if trInfo != nil {
1029		trInfo.tr.LazyLog(stringer("OK"), false)
1030	}
1031	opts := &transport.Options{Last: true}
1032
1033	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1034		if err == io.EOF {
1035			// The entire stream is done (for unary RPC only).
1036			return err
1037		}
1038		if s, ok := status.FromError(err); ok {
1039			if e := t.WriteStatus(stream, s); e != nil {
1040				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1041			}
1042		} else {
1043			switch st := err.(type) {
1044			case transport.ConnectionError:
1045				// Nothing to do here.
1046			default:
1047				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1048			}
1049		}
1050		if binlog != nil {
1051			h, _ := stream.Header()
1052			binlog.Log(&binarylog.ServerHeader{
1053				Header: h,
1054			})
1055			binlog.Log(&binarylog.ServerTrailer{
1056				Trailer: stream.Trailer(),
1057				Err:     appErr,
1058			})
1059		}
1060		return err
1061	}
1062	if binlog != nil {
1063		h, _ := stream.Header()
1064		binlog.Log(&binarylog.ServerHeader{
1065			Header: h,
1066		})
1067		binlog.Log(&binarylog.ServerMessage{
1068			Message: reply,
1069		})
1070	}
1071	if channelz.IsOn() {
1072		t.IncrMsgSent()
1073	}
1074	if trInfo != nil {
1075		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1076	}
1077	// TODO: Should we be logging if writing status failed here, like above?
1078	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1079	// error or allow the stats handler to see it?
1080	err = t.WriteStatus(stream, status.New(codes.OK, ""))
1081	if binlog != nil {
1082		binlog.Log(&binarylog.ServerTrailer{
1083			Trailer: stream.Trailer(),
1084			Err:     appErr,
1085		})
1086	}
1087	return err
1088}
1089
1090func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1091	if channelz.IsOn() {
1092		s.incrCallsStarted()
1093		defer func() {
1094			if err != nil && err != io.EOF {
1095				s.incrCallsFailed()
1096			} else {
1097				s.incrCallsSucceeded()
1098			}
1099		}()
1100	}
1101	sh := s.opts.statsHandler
1102	if sh != nil {
1103		beginTime := time.Now()
1104		begin := &stats.Begin{
1105			BeginTime: beginTime,
1106		}
1107		sh.HandleRPC(stream.Context(), begin)
1108		defer func() {
1109			end := &stats.End{
1110				BeginTime: beginTime,
1111				EndTime:   time.Now(),
1112			}
1113			if err != nil && err != io.EOF {
1114				end.Error = toRPCErr(err)
1115			}
1116			sh.HandleRPC(stream.Context(), end)
1117		}()
1118	}
1119	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1120	ss := &serverStream{
1121		ctx:                   ctx,
1122		t:                     t,
1123		s:                     stream,
1124		p:                     &parser{r: stream},
1125		codec:                 s.getCodec(stream.ContentSubtype()),
1126		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1127		maxSendMessageSize:    s.opts.maxSendMessageSize,
1128		trInfo:                trInfo,
1129		statsHandler:          sh,
1130	}
1131
1132	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1133	if ss.binlog != nil {
1134		md, _ := metadata.FromIncomingContext(ctx)
1135		logEntry := &binarylog.ClientHeader{
1136			Header:     md,
1137			MethodName: stream.Method(),
1138			PeerAddr:   nil,
1139		}
1140		if deadline, ok := ctx.Deadline(); ok {
1141			logEntry.Timeout = time.Until(deadline)
1142			if logEntry.Timeout < 0 {
1143				logEntry.Timeout = 0
1144			}
1145		}
1146		if a := md[":authority"]; len(a) > 0 {
1147			logEntry.Authority = a[0]
1148		}
1149		if peer, ok := peer.FromContext(ss.Context()); ok {
1150			logEntry.PeerAddr = peer.Addr
1151		}
1152		ss.binlog.Log(logEntry)
1153	}
1154
1155	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1156	// to find a matching registered compressor for decomp.
1157	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1158		ss.dc = s.opts.dc
1159	} else if rc != "" && rc != encoding.Identity {
1160		ss.decomp = encoding.GetCompressor(rc)
1161		if ss.decomp == nil {
1162			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1163			t.WriteStatus(ss.s, st)
1164			return st.Err()
1165		}
1166	}
1167
1168	// If cp is set, use it.  Otherwise, attempt to compress the response using
1169	// the incoming message compression method.
1170	//
1171	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1172	if s.opts.cp != nil {
1173		ss.cp = s.opts.cp
1174		stream.SetSendCompress(s.opts.cp.Type())
1175	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1176		// Legacy compressor not specified; attempt to respond with same encoding.
1177		ss.comp = encoding.GetCompressor(rc)
1178		if ss.comp != nil {
1179			stream.SetSendCompress(rc)
1180		}
1181	}
1182
1183	if trInfo != nil {
1184		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1185		defer func() {
1186			ss.mu.Lock()
1187			if err != nil && err != io.EOF {
1188				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1189				ss.trInfo.tr.SetError()
1190			}
1191			ss.trInfo.tr.Finish()
1192			ss.trInfo.tr = nil
1193			ss.mu.Unlock()
1194		}()
1195	}
1196	var appErr error
1197	var server interface{}
1198	if srv != nil {
1199		server = srv.server
1200	}
1201	if s.opts.streamInt == nil {
1202		appErr = sd.Handler(server, ss)
1203	} else {
1204		info := &StreamServerInfo{
1205			FullMethod:     stream.Method(),
1206			IsClientStream: sd.ClientStreams,
1207			IsServerStream: sd.ServerStreams,
1208		}
1209		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1210	}
1211	if appErr != nil {
1212		appStatus, ok := status.FromError(appErr)
1213		if !ok {
1214			appStatus = status.New(codes.Unknown, appErr.Error())
1215			appErr = appStatus.Err()
1216		}
1217		if trInfo != nil {
1218			ss.mu.Lock()
1219			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1220			ss.trInfo.tr.SetError()
1221			ss.mu.Unlock()
1222		}
1223		t.WriteStatus(ss.s, appStatus)
1224		if ss.binlog != nil {
1225			ss.binlog.Log(&binarylog.ServerTrailer{
1226				Trailer: ss.s.Trailer(),
1227				Err:     appErr,
1228			})
1229		}
1230		// TODO: Should we log an error from WriteStatus here and below?
1231		return appErr
1232	}
1233	if trInfo != nil {
1234		ss.mu.Lock()
1235		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1236		ss.mu.Unlock()
1237	}
1238	err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1239	if ss.binlog != nil {
1240		ss.binlog.Log(&binarylog.ServerTrailer{
1241			Trailer: ss.s.Trailer(),
1242			Err:     appErr,
1243		})
1244	}
1245	return err
1246}
1247
1248func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1249	sm := stream.Method()
1250	if sm != "" && sm[0] == '/' {
1251		sm = sm[1:]
1252	}
1253	pos := strings.LastIndex(sm, "/")
1254	if pos == -1 {
1255		if trInfo != nil {
1256			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1257			trInfo.tr.SetError()
1258		}
1259		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1260		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1261			if trInfo != nil {
1262				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1263				trInfo.tr.SetError()
1264			}
1265			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1266		}
1267		if trInfo != nil {
1268			trInfo.tr.Finish()
1269		}
1270		return
1271	}
1272	service := sm[:pos]
1273	method := sm[pos+1:]
1274
1275	srv, knownService := s.m[service]
1276	if knownService {
1277		if md, ok := srv.md[method]; ok {
1278			s.processUnaryRPC(t, stream, srv, md, trInfo)
1279			return
1280		}
1281		if sd, ok := srv.sd[method]; ok {
1282			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1283			return
1284		}
1285	}
1286	// Unknown service, or known server unknown method.
1287	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1288		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1289		return
1290	}
1291	var errDesc string
1292	if !knownService {
1293		errDesc = fmt.Sprintf("unknown service %v", service)
1294	} else {
1295		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1296	}
1297	if trInfo != nil {
1298		trInfo.tr.LazyPrintf("%s", errDesc)
1299		trInfo.tr.SetError()
1300	}
1301	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1302		if trInfo != nil {
1303			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1304			trInfo.tr.SetError()
1305		}
1306		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1307	}
1308	if trInfo != nil {
1309		trInfo.tr.Finish()
1310	}
1311}
1312
1313// The key to save ServerTransportStream in the context.
1314type streamKey struct{}
1315
1316// NewContextWithServerTransportStream creates a new context from ctx and
1317// attaches stream to it.
1318//
1319// This API is EXPERIMENTAL.
1320func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1321	return context.WithValue(ctx, streamKey{}, stream)
1322}
1323
1324// ServerTransportStream is a minimal interface that a transport stream must
1325// implement. This can be used to mock an actual transport stream for tests of
1326// handler code that use, for example, grpc.SetHeader (which requires some
1327// stream to be in context).
1328//
1329// See also NewContextWithServerTransportStream.
1330//
1331// This API is EXPERIMENTAL.
1332type ServerTransportStream interface {
1333	Method() string
1334	SetHeader(md metadata.MD) error
1335	SendHeader(md metadata.MD) error
1336	SetTrailer(md metadata.MD) error
1337}
1338
1339// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1340// ctx. Returns nil if the given context has no stream associated with it
1341// (which implies it is not an RPC invocation context).
1342//
1343// This API is EXPERIMENTAL.
1344func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1345	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1346	return s
1347}
1348
1349// Stop stops the gRPC server. It immediately closes all open
1350// connections and listeners.
1351// It cancels all active RPCs on the server side and the corresponding
1352// pending RPCs on the client side will get notified by connection
1353// errors.
1354func (s *Server) Stop() {
1355	s.quitOnce.Do(func() {
1356		close(s.quit)
1357	})
1358
1359	defer func() {
1360		s.serveWG.Wait()
1361		s.doneOnce.Do(func() {
1362			close(s.done)
1363		})
1364	}()
1365
1366	s.channelzRemoveOnce.Do(func() {
1367		if channelz.IsOn() {
1368			channelz.RemoveEntry(s.channelzID)
1369		}
1370	})
1371
1372	s.mu.Lock()
1373	listeners := s.lis
1374	s.lis = nil
1375	st := s.conns
1376	s.conns = nil
1377	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1378	s.cv.Broadcast()
1379	s.mu.Unlock()
1380
1381	for lis := range listeners {
1382		lis.Close()
1383	}
1384	for c := range st {
1385		c.Close()
1386	}
1387
1388	s.mu.Lock()
1389	if s.events != nil {
1390		s.events.Finish()
1391		s.events = nil
1392	}
1393	s.mu.Unlock()
1394}
1395
1396// GracefulStop stops the gRPC server gracefully. It stops the server from
1397// accepting new connections and RPCs and blocks until all the pending RPCs are
1398// finished.
1399func (s *Server) GracefulStop() {
1400	s.quitOnce.Do(func() {
1401		close(s.quit)
1402	})
1403
1404	defer func() {
1405		s.doneOnce.Do(func() {
1406			close(s.done)
1407		})
1408	}()
1409
1410	s.channelzRemoveOnce.Do(func() {
1411		if channelz.IsOn() {
1412			channelz.RemoveEntry(s.channelzID)
1413		}
1414	})
1415	s.mu.Lock()
1416	if s.conns == nil {
1417		s.mu.Unlock()
1418		return
1419	}
1420
1421	for lis := range s.lis {
1422		lis.Close()
1423	}
1424	s.lis = nil
1425	if !s.drain {
1426		for st := range s.conns {
1427			st.Drain()
1428		}
1429		s.drain = true
1430	}
1431
1432	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1433	// new conns will be created.
1434	s.mu.Unlock()
1435	s.serveWG.Wait()
1436	s.mu.Lock()
1437
1438	for len(s.conns) != 0 {
1439		s.cv.Wait()
1440	}
1441	s.conns = nil
1442	if s.events != nil {
1443		s.events.Finish()
1444		s.events = nil
1445	}
1446	s.mu.Unlock()
1447}
1448
1449// contentSubtype must be lowercase
1450// cannot return nil
1451func (s *Server) getCodec(contentSubtype string) baseCodec {
1452	if s.opts.codec != nil {
1453		return s.opts.codec
1454	}
1455	if contentSubtype == "" {
1456		return encoding.GetCodec(proto.Name)
1457	}
1458	codec := encoding.GetCodec(contentSubtype)
1459	if codec == nil {
1460		return encoding.GetCodec(proto.Name)
1461	}
1462	return codec
1463}
1464
1465// SetHeader sets the header metadata.
1466// When called multiple times, all the provided metadata will be merged.
1467// All the metadata will be sent out when one of the following happens:
1468//  - grpc.SendHeader() is called;
1469//  - The first response is sent out;
1470//  - An RPC status is sent out (error or success).
1471func SetHeader(ctx context.Context, md metadata.MD) error {
1472	if md.Len() == 0 {
1473		return nil
1474	}
1475	stream := ServerTransportStreamFromContext(ctx)
1476	if stream == nil {
1477		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1478	}
1479	return stream.SetHeader(md)
1480}
1481
1482// SendHeader sends header metadata. It may be called at most once.
1483// The provided md and headers set by SetHeader() will be sent.
1484func SendHeader(ctx context.Context, md metadata.MD) error {
1485	stream := ServerTransportStreamFromContext(ctx)
1486	if stream == nil {
1487		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1488	}
1489	if err := stream.SendHeader(md); err != nil {
1490		return toRPCErr(err)
1491	}
1492	return nil
1493}
1494
1495// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1496// When called more than once, all the provided metadata will be merged.
1497func SetTrailer(ctx context.Context, md metadata.MD) error {
1498	if md.Len() == 0 {
1499		return nil
1500	}
1501	stream := ServerTransportStreamFromContext(ctx)
1502	if stream == nil {
1503		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1504	}
1505	return stream.SetTrailer(md)
1506}
1507
1508// Method returns the method string for the server context.  The returned
1509// string is in the format of "/service/method".
1510func Method(ctx context.Context) (string, bool) {
1511	s := ServerTransportStreamFromContext(ctx)
1512	if s == nil {
1513		return "", false
1514	}
1515	return s.Method(), true
1516}
1517
1518type channelzServer struct {
1519	s *Server
1520}
1521
1522func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1523	return c.s.channelzMetric()
1524}
1525