1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"sync/atomic"
34	"time"
35
36	"golang.org/x/net/trace"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/encoding"
41	"google.golang.org/grpc/encoding/proto"
42	"google.golang.org/grpc/grpclog"
43	"google.golang.org/grpc/internal/binarylog"
44	"google.golang.org/grpc/internal/channelz"
45	"google.golang.org/grpc/internal/grpcrand"
46	"google.golang.org/grpc/internal/grpcsync"
47	"google.golang.org/grpc/internal/transport"
48	"google.golang.org/grpc/keepalive"
49	"google.golang.org/grpc/metadata"
50	"google.golang.org/grpc/peer"
51	"google.golang.org/grpc/stats"
52	"google.golang.org/grpc/status"
53	"google.golang.org/grpc/tap"
54)
55
56const (
57	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
58	defaultServerMaxSendMessageSize    = math.MaxInt32
59)
60
61var statusOK = status.New(codes.OK, "")
62
63type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
64
65// MethodDesc represents an RPC service's method specification.
66type MethodDesc struct {
67	MethodName string
68	Handler    methodHandler
69}
70
71// ServiceDesc represents an RPC service's specification.
72type ServiceDesc struct {
73	ServiceName string
74	// The pointer to the service interface. Used to check whether the user
75	// provided implementation satisfies the interface requirements.
76	HandlerType interface{}
77	Methods     []MethodDesc
78	Streams     []StreamDesc
79	Metadata    interface{}
80}
81
82// service consists of the information of the server serving this service and
83// the methods in this service.
84type service struct {
85	server interface{} // the server for service methods
86	md     map[string]*MethodDesc
87	sd     map[string]*StreamDesc
88	mdata  interface{}
89}
90
91type serverWorkerData struct {
92	st     transport.ServerTransport
93	wg     *sync.WaitGroup
94	stream *transport.Stream
95}
96
97// Server is a gRPC server to serve RPC requests.
98type Server struct {
99	opts serverOptions
100
101	mu     sync.Mutex // guards following
102	lis    map[net.Listener]bool
103	conns  map[transport.ServerTransport]bool
104	serve  bool
105	drain  bool
106	cv     *sync.Cond          // signaled when connections close for GracefulStop
107	m      map[string]*service // service name -> service info
108	events trace.EventLog
109
110	quit               *grpcsync.Event
111	done               *grpcsync.Event
112	channelzRemoveOnce sync.Once
113	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
114
115	channelzID int64 // channelz unique identification number
116	czData     *channelzData
117
118	serverWorkerChannels []chan *serverWorkerData
119}
120
121type serverOptions struct {
122	creds                 credentials.TransportCredentials
123	codec                 baseCodec
124	cp                    Compressor
125	dc                    Decompressor
126	unaryInt              UnaryServerInterceptor
127	streamInt             StreamServerInterceptor
128	chainUnaryInts        []UnaryServerInterceptor
129	chainStreamInts       []StreamServerInterceptor
130	inTapHandle           tap.ServerInHandle
131	statsHandler          stats.Handler
132	maxConcurrentStreams  uint32
133	maxReceiveMessageSize int
134	maxSendMessageSize    int
135	unknownStreamDesc     *StreamDesc
136	keepaliveParams       keepalive.ServerParameters
137	keepalivePolicy       keepalive.EnforcementPolicy
138	initialWindowSize     int32
139	initialConnWindowSize int32
140	writeBufferSize       int
141	readBufferSize        int
142	connectionTimeout     time.Duration
143	maxHeaderListSize     *uint32
144	headerTableSize       *uint32
145	numServerWorkers      uint32
146}
147
148var defaultServerOptions = serverOptions{
149	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
150	maxSendMessageSize:    defaultServerMaxSendMessageSize,
151	connectionTimeout:     120 * time.Second,
152	writeBufferSize:       defaultWriteBufSize,
153	readBufferSize:        defaultReadBufSize,
154}
155
156// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
157type ServerOption interface {
158	apply(*serverOptions)
159}
160
161// EmptyServerOption does not alter the server configuration. It can be embedded
162// in another structure to build custom server options.
163//
164// This API is EXPERIMENTAL.
165type EmptyServerOption struct{}
166
167func (EmptyServerOption) apply(*serverOptions) {}
168
169// funcServerOption wraps a function that modifies serverOptions into an
170// implementation of the ServerOption interface.
171type funcServerOption struct {
172	f func(*serverOptions)
173}
174
175func (fdo *funcServerOption) apply(do *serverOptions) {
176	fdo.f(do)
177}
178
179func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
180	return &funcServerOption{
181		f: f,
182	}
183}
184
185// WriteBufferSize determines how much data can be batched before doing a write on the wire.
186// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
187// The default value for this buffer is 32KB.
188// Zero will disable the write buffer such that each write will be on underlying connection.
189// Note: A Send call may not directly translate to a write.
190func WriteBufferSize(s int) ServerOption {
191	return newFuncServerOption(func(o *serverOptions) {
192		o.writeBufferSize = s
193	})
194}
195
196// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
197// for one read syscall.
198// The default value for this buffer is 32KB.
199// Zero will disable read buffer for a connection so data framer can access the underlying
200// conn directly.
201func ReadBufferSize(s int) ServerOption {
202	return newFuncServerOption(func(o *serverOptions) {
203		o.readBufferSize = s
204	})
205}
206
207// InitialWindowSize returns a ServerOption that sets window size for stream.
208// The lower bound for window size is 64K and any value smaller than that will be ignored.
209func InitialWindowSize(s int32) ServerOption {
210	return newFuncServerOption(func(o *serverOptions) {
211		o.initialWindowSize = s
212	})
213}
214
215// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
216// The lower bound for window size is 64K and any value smaller than that will be ignored.
217func InitialConnWindowSize(s int32) ServerOption {
218	return newFuncServerOption(func(o *serverOptions) {
219		o.initialConnWindowSize = s
220	})
221}
222
223// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
224func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
225	if kp.Time > 0 && kp.Time < time.Second {
226		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
227		kp.Time = time.Second
228	}
229
230	return newFuncServerOption(func(o *serverOptions) {
231		o.keepaliveParams = kp
232	})
233}
234
235// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
236func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
237	return newFuncServerOption(func(o *serverOptions) {
238		o.keepalivePolicy = kep
239	})
240}
241
242// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
243//
244// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
245func CustomCodec(codec Codec) ServerOption {
246	return newFuncServerOption(func(o *serverOptions) {
247		o.codec = codec
248	})
249}
250
251// RPCCompressor returns a ServerOption that sets a compressor for outbound
252// messages.  For backward compatibility, all outbound messages will be sent
253// using this compressor, regardless of incoming message compression.  By
254// default, server messages will be sent using the same compressor with which
255// request messages were sent.
256//
257// Deprecated: use encoding.RegisterCompressor instead.
258func RPCCompressor(cp Compressor) ServerOption {
259	return newFuncServerOption(func(o *serverOptions) {
260		o.cp = cp
261	})
262}
263
264// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
265// messages.  It has higher priority than decompressors registered via
266// encoding.RegisterCompressor.
267//
268// Deprecated: use encoding.RegisterCompressor instead.
269func RPCDecompressor(dc Decompressor) ServerOption {
270	return newFuncServerOption(func(o *serverOptions) {
271		o.dc = dc
272	})
273}
274
275// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
276// If this is not set, gRPC uses the default limit.
277//
278// Deprecated: use MaxRecvMsgSize instead.
279func MaxMsgSize(m int) ServerOption {
280	return MaxRecvMsgSize(m)
281}
282
283// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
284// If this is not set, gRPC uses the default 4MB.
285func MaxRecvMsgSize(m int) ServerOption {
286	return newFuncServerOption(func(o *serverOptions) {
287		o.maxReceiveMessageSize = m
288	})
289}
290
291// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
292// If this is not set, gRPC uses the default `math.MaxInt32`.
293func MaxSendMsgSize(m int) ServerOption {
294	return newFuncServerOption(func(o *serverOptions) {
295		o.maxSendMessageSize = m
296	})
297}
298
299// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
300// of concurrent streams to each ServerTransport.
301func MaxConcurrentStreams(n uint32) ServerOption {
302	return newFuncServerOption(func(o *serverOptions) {
303		o.maxConcurrentStreams = n
304	})
305}
306
307// Creds returns a ServerOption that sets credentials for server connections.
308func Creds(c credentials.TransportCredentials) ServerOption {
309	return newFuncServerOption(func(o *serverOptions) {
310		o.creds = c
311	})
312}
313
314// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
315// server. Only one unary interceptor can be installed. The construction of multiple
316// interceptors (e.g., chaining) can be implemented at the caller.
317func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
318	return newFuncServerOption(func(o *serverOptions) {
319		if o.unaryInt != nil {
320			panic("The unary server interceptor was already set and may not be reset.")
321		}
322		o.unaryInt = i
323	})
324}
325
326// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
327// for unary RPCs. The first interceptor will be the outer most,
328// while the last interceptor will be the inner most wrapper around the real call.
329// All unary interceptors added by this method will be chained.
330func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
331	return newFuncServerOption(func(o *serverOptions) {
332		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
333	})
334}
335
336// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
337// server. Only one stream interceptor can be installed.
338func StreamInterceptor(i StreamServerInterceptor) ServerOption {
339	return newFuncServerOption(func(o *serverOptions) {
340		if o.streamInt != nil {
341			panic("The stream server interceptor was already set and may not be reset.")
342		}
343		o.streamInt = i
344	})
345}
346
347// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
348// for streaming RPCs. The first interceptor will be the outer most,
349// while the last interceptor will be the inner most wrapper around the real call.
350// All stream interceptors added by this method will be chained.
351func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
352	return newFuncServerOption(func(o *serverOptions) {
353		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
354	})
355}
356
357// InTapHandle returns a ServerOption that sets the tap handle for all the server
358// transport to be created. Only one can be installed.
359func InTapHandle(h tap.ServerInHandle) ServerOption {
360	return newFuncServerOption(func(o *serverOptions) {
361		if o.inTapHandle != nil {
362			panic("The tap handle was already set and may not be reset.")
363		}
364		o.inTapHandle = h
365	})
366}
367
368// StatsHandler returns a ServerOption that sets the stats handler for the server.
369func StatsHandler(h stats.Handler) ServerOption {
370	return newFuncServerOption(func(o *serverOptions) {
371		o.statsHandler = h
372	})
373}
374
375// UnknownServiceHandler returns a ServerOption that allows for adding a custom
376// unknown service handler. The provided method is a bidi-streaming RPC service
377// handler that will be invoked instead of returning the "unimplemented" gRPC
378// error whenever a request is received for an unregistered service or method.
379// The handling function and stream interceptor (if set) have full access to
380// the ServerStream, including its Context.
381func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
382	return newFuncServerOption(func(o *serverOptions) {
383		o.unknownStreamDesc = &StreamDesc{
384			StreamName: "unknown_service_handler",
385			Handler:    streamHandler,
386			// We need to assume that the users of the streamHandler will want to use both.
387			ClientStreams: true,
388			ServerStreams: true,
389		}
390	})
391}
392
393// ConnectionTimeout returns a ServerOption that sets the timeout for
394// connection establishment (up to and including HTTP/2 handshaking) for all
395// new connections.  If this is not set, the default is 120 seconds.  A zero or
396// negative value will result in an immediate timeout.
397//
398// This API is EXPERIMENTAL.
399func ConnectionTimeout(d time.Duration) ServerOption {
400	return newFuncServerOption(func(o *serverOptions) {
401		o.connectionTimeout = d
402	})
403}
404
405// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
406// of header list that the server is prepared to accept.
407func MaxHeaderListSize(s uint32) ServerOption {
408	return newFuncServerOption(func(o *serverOptions) {
409		o.maxHeaderListSize = &s
410	})
411}
412
413// HeaderTableSize returns a ServerOption that sets the size of dynamic
414// header table for stream.
415//
416// This API is EXPERIMENTAL.
417func HeaderTableSize(s uint32) ServerOption {
418	return newFuncServerOption(func(o *serverOptions) {
419		o.headerTableSize = &s
420	})
421}
422
423// NumStreamWorkers returns a ServerOption that sets the number of worker
424// goroutines that should be used to process incoming streams. Setting this to
425// zero (default) will disable workers and spawn a new goroutine for each
426// stream.
427//
428// This API is EXPERIMENTAL.
429func NumStreamWorkers(numServerWorkers uint32) ServerOption {
430	// TODO: If/when this API gets stabilized (i.e. stream workers become the
431	// only way streams are processed), change the behavior of the zero value to
432	// a sane default. Preliminary experiments suggest that a value equal to the
433	// number of CPUs available is most performant; requires thorough testing.
434	return newFuncServerOption(func(o *serverOptions) {
435		o.numServerWorkers = numServerWorkers
436	})
437}
438
439// serverWorkerResetThreshold defines how often the stack must be reset. Every
440// N requests, by spawning a new goroutine in its place, a worker can reset its
441// stack so that large stacks don't live in memory forever. 2^16 should allow
442// each goroutine stack to live for at least a few seconds in a typical
443// workload (assuming a QPS of a few thousand requests/sec).
444const serverWorkerResetThreshold = 1 << 16
445
446// serverWorkers blocks on a *transport.Stream channel forever and waits for
447// data to be fed by serveStreams. This allows different requests to be
448// processed by the same goroutine, removing the need for expensive stack
449// re-allocations (see the runtime.morestack problem [1]).
450//
451// [1] https://github.com/golang/go/issues/18138
452func (s *Server) serverWorker(ch chan *serverWorkerData) {
453	// To make sure all server workers don't reset at the same time, choose a
454	// random number of iterations before resetting.
455	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
456	for completed := 0; completed < threshold; completed++ {
457		data, ok := <-ch
458		if !ok {
459			return
460		}
461		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
462		data.wg.Done()
463	}
464	go s.serverWorker(ch)
465}
466
467// initServerWorkers creates worker goroutines and channels to process incoming
468// connections to reduce the time spent overall on runtime.morestack.
469func (s *Server) initServerWorkers() {
470	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
471	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
472		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
473		go s.serverWorker(s.serverWorkerChannels[i])
474	}
475}
476
477func (s *Server) stopServerWorkers() {
478	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
479		close(s.serverWorkerChannels[i])
480	}
481}
482
483// NewServer creates a gRPC server which has no service registered and has not
484// started to accept requests yet.
485func NewServer(opt ...ServerOption) *Server {
486	opts := defaultServerOptions
487	for _, o := range opt {
488		o.apply(&opts)
489	}
490	s := &Server{
491		lis:    make(map[net.Listener]bool),
492		opts:   opts,
493		conns:  make(map[transport.ServerTransport]bool),
494		m:      make(map[string]*service),
495		quit:   grpcsync.NewEvent(),
496		done:   grpcsync.NewEvent(),
497		czData: new(channelzData),
498	}
499	chainUnaryServerInterceptors(s)
500	chainStreamServerInterceptors(s)
501	s.cv = sync.NewCond(&s.mu)
502	if EnableTracing {
503		_, file, line, _ := runtime.Caller(1)
504		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
505	}
506
507	if s.opts.numServerWorkers > 0 {
508		s.initServerWorkers()
509	}
510
511	if channelz.IsOn() {
512		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
513	}
514	return s
515}
516
517// printf records an event in s's event log, unless s has been stopped.
518// REQUIRES s.mu is held.
519func (s *Server) printf(format string, a ...interface{}) {
520	if s.events != nil {
521		s.events.Printf(format, a...)
522	}
523}
524
525// errorf records an error in s's event log, unless s has been stopped.
526// REQUIRES s.mu is held.
527func (s *Server) errorf(format string, a ...interface{}) {
528	if s.events != nil {
529		s.events.Errorf(format, a...)
530	}
531}
532
533// RegisterService registers a service and its implementation to the gRPC
534// server. It is called from the IDL generated code. This must be called before
535// invoking Serve.
536func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
537	ht := reflect.TypeOf(sd.HandlerType).Elem()
538	st := reflect.TypeOf(ss)
539	if !st.Implements(ht) {
540		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
541	}
542	s.register(sd, ss)
543}
544
545func (s *Server) register(sd *ServiceDesc, ss interface{}) {
546	s.mu.Lock()
547	defer s.mu.Unlock()
548	s.printf("RegisterService(%q)", sd.ServiceName)
549	if s.serve {
550		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
551	}
552	if _, ok := s.m[sd.ServiceName]; ok {
553		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
554	}
555	srv := &service{
556		server: ss,
557		md:     make(map[string]*MethodDesc),
558		sd:     make(map[string]*StreamDesc),
559		mdata:  sd.Metadata,
560	}
561	for i := range sd.Methods {
562		d := &sd.Methods[i]
563		srv.md[d.MethodName] = d
564	}
565	for i := range sd.Streams {
566		d := &sd.Streams[i]
567		srv.sd[d.StreamName] = d
568	}
569	s.m[sd.ServiceName] = srv
570}
571
572// MethodInfo contains the information of an RPC including its method name and type.
573type MethodInfo struct {
574	// Name is the method name only, without the service name or package name.
575	Name string
576	// IsClientStream indicates whether the RPC is a client streaming RPC.
577	IsClientStream bool
578	// IsServerStream indicates whether the RPC is a server streaming RPC.
579	IsServerStream bool
580}
581
582// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
583type ServiceInfo struct {
584	Methods []MethodInfo
585	// Metadata is the metadata specified in ServiceDesc when registering service.
586	Metadata interface{}
587}
588
589// GetServiceInfo returns a map from service names to ServiceInfo.
590// Service names include the package names, in the form of <package>.<service>.
591func (s *Server) GetServiceInfo() map[string]ServiceInfo {
592	ret := make(map[string]ServiceInfo)
593	for n, srv := range s.m {
594		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
595		for m := range srv.md {
596			methods = append(methods, MethodInfo{
597				Name:           m,
598				IsClientStream: false,
599				IsServerStream: false,
600			})
601		}
602		for m, d := range srv.sd {
603			methods = append(methods, MethodInfo{
604				Name:           m,
605				IsClientStream: d.ClientStreams,
606				IsServerStream: d.ServerStreams,
607			})
608		}
609
610		ret[n] = ServiceInfo{
611			Methods:  methods,
612			Metadata: srv.mdata,
613		}
614	}
615	return ret
616}
617
618// ErrServerStopped indicates that the operation is now illegal because of
619// the server being stopped.
620var ErrServerStopped = errors.New("grpc: the server has been stopped")
621
622func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
623	if s.opts.creds == nil {
624		return rawConn, nil, nil
625	}
626	return s.opts.creds.ServerHandshake(rawConn)
627}
628
629type listenSocket struct {
630	net.Listener
631	channelzID int64
632}
633
634func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
635	return &channelz.SocketInternalMetric{
636		SocketOptions: channelz.GetSocketOption(l.Listener),
637		LocalAddr:     l.Listener.Addr(),
638	}
639}
640
641func (l *listenSocket) Close() error {
642	err := l.Listener.Close()
643	if channelz.IsOn() {
644		channelz.RemoveEntry(l.channelzID)
645	}
646	return err
647}
648
649// Serve accepts incoming connections on the listener lis, creating a new
650// ServerTransport and service goroutine for each. The service goroutines
651// read gRPC requests and then call the registered handlers to reply to them.
652// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
653// this method returns.
654// Serve will return a non-nil error unless Stop or GracefulStop is called.
655func (s *Server) Serve(lis net.Listener) error {
656	s.mu.Lock()
657	s.printf("serving")
658	s.serve = true
659	if s.lis == nil {
660		// Serve called after Stop or GracefulStop.
661		s.mu.Unlock()
662		lis.Close()
663		return ErrServerStopped
664	}
665
666	s.serveWG.Add(1)
667	defer func() {
668		s.serveWG.Done()
669		if s.quit.HasFired() {
670			// Stop or GracefulStop called; block until done and return nil.
671			<-s.done.Done()
672		}
673	}()
674
675	ls := &listenSocket{Listener: lis}
676	s.lis[ls] = true
677
678	if channelz.IsOn() {
679		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
680	}
681	s.mu.Unlock()
682
683	defer func() {
684		s.mu.Lock()
685		if s.lis != nil && s.lis[ls] {
686			ls.Close()
687			delete(s.lis, ls)
688		}
689		s.mu.Unlock()
690	}()
691
692	var tempDelay time.Duration // how long to sleep on accept failure
693
694	for {
695		rawConn, err := lis.Accept()
696		if err != nil {
697			if ne, ok := err.(interface {
698				Temporary() bool
699			}); ok && ne.Temporary() {
700				if tempDelay == 0 {
701					tempDelay = 5 * time.Millisecond
702				} else {
703					tempDelay *= 2
704				}
705				if max := 1 * time.Second; tempDelay > max {
706					tempDelay = max
707				}
708				s.mu.Lock()
709				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
710				s.mu.Unlock()
711				timer := time.NewTimer(tempDelay)
712				select {
713				case <-timer.C:
714				case <-s.quit.Done():
715					timer.Stop()
716					return nil
717				}
718				continue
719			}
720			s.mu.Lock()
721			s.printf("done serving; Accept = %v", err)
722			s.mu.Unlock()
723
724			if s.quit.HasFired() {
725				return nil
726			}
727			return err
728		}
729		tempDelay = 0
730		// Start a new goroutine to deal with rawConn so we don't stall this Accept
731		// loop goroutine.
732		//
733		// Make sure we account for the goroutine so GracefulStop doesn't nil out
734		// s.conns before this conn can be added.
735		s.serveWG.Add(1)
736		go func() {
737			s.handleRawConn(rawConn)
738			s.serveWG.Done()
739		}()
740	}
741}
742
743// handleRawConn forks a goroutine to handle a just-accepted connection that
744// has not had any I/O performed on it yet.
745func (s *Server) handleRawConn(rawConn net.Conn) {
746	if s.quit.HasFired() {
747		rawConn.Close()
748		return
749	}
750	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
751	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
752	if err != nil {
753		// ErrConnDispatched means that the connection was dispatched away from
754		// gRPC; those connections should be left open.
755		if err != credentials.ErrConnDispatched {
756			s.mu.Lock()
757			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
758			s.mu.Unlock()
759			channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
760			rawConn.Close()
761		}
762		rawConn.SetDeadline(time.Time{})
763		return
764	}
765
766	// Finish handshaking (HTTP2)
767	st := s.newHTTP2Transport(conn, authInfo)
768	if st == nil {
769		return
770	}
771
772	rawConn.SetDeadline(time.Time{})
773	if !s.addConn(st) {
774		return
775	}
776	go func() {
777		s.serveStreams(st)
778		s.removeConn(st)
779	}()
780}
781
782// newHTTP2Transport sets up a http/2 transport (using the
783// gRPC http2 server transport in transport/http2_server.go).
784func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
785	config := &transport.ServerConfig{
786		MaxStreams:            s.opts.maxConcurrentStreams,
787		AuthInfo:              authInfo,
788		InTapHandle:           s.opts.inTapHandle,
789		StatsHandler:          s.opts.statsHandler,
790		KeepaliveParams:       s.opts.keepaliveParams,
791		KeepalivePolicy:       s.opts.keepalivePolicy,
792		InitialWindowSize:     s.opts.initialWindowSize,
793		InitialConnWindowSize: s.opts.initialConnWindowSize,
794		WriteBufferSize:       s.opts.writeBufferSize,
795		ReadBufferSize:        s.opts.readBufferSize,
796		ChannelzParentID:      s.channelzID,
797		MaxHeaderListSize:     s.opts.maxHeaderListSize,
798		HeaderTableSize:       s.opts.headerTableSize,
799	}
800	st, err := transport.NewServerTransport("http2", c, config)
801	if err != nil {
802		s.mu.Lock()
803		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
804		s.mu.Unlock()
805		c.Close()
806		channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
807		return nil
808	}
809
810	return st
811}
812
813func (s *Server) serveStreams(st transport.ServerTransport) {
814	defer st.Close()
815	var wg sync.WaitGroup
816
817	var roundRobinCounter uint32
818	st.HandleStreams(func(stream *transport.Stream) {
819		wg.Add(1)
820		if s.opts.numServerWorkers > 0 {
821			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
822			select {
823			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
824			default:
825				// If all stream workers are busy, fallback to the default code path.
826				go func() {
827					s.handleStream(st, stream, s.traceInfo(st, stream))
828					wg.Done()
829				}()
830			}
831		} else {
832			go func() {
833				defer wg.Done()
834				s.handleStream(st, stream, s.traceInfo(st, stream))
835			}()
836		}
837	}, func(ctx context.Context, method string) context.Context {
838		if !EnableTracing {
839			return ctx
840		}
841		tr := trace.New("grpc.Recv."+methodFamily(method), method)
842		return trace.NewContext(ctx, tr)
843	})
844	wg.Wait()
845}
846
847var _ http.Handler = (*Server)(nil)
848
849// ServeHTTP implements the Go standard library's http.Handler
850// interface by responding to the gRPC request r, by looking up
851// the requested gRPC method in the gRPC server s.
852//
853// The provided HTTP request must have arrived on an HTTP/2
854// connection. When using the Go standard library's server,
855// practically this means that the Request must also have arrived
856// over TLS.
857//
858// To share one port (such as 443 for https) between gRPC and an
859// existing http.Handler, use a root http.Handler such as:
860//
861//   if r.ProtoMajor == 2 && strings.HasPrefix(
862//   	r.Header.Get("Content-Type"), "application/grpc") {
863//   	grpcServer.ServeHTTP(w, r)
864//   } else {
865//   	yourMux.ServeHTTP(w, r)
866//   }
867//
868// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
869// separate from grpc-go's HTTP/2 server. Performance and features may vary
870// between the two paths. ServeHTTP does not support some gRPC features
871// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
872// and subject to change.
873func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
874	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
875	if err != nil {
876		http.Error(w, err.Error(), http.StatusInternalServerError)
877		return
878	}
879	if !s.addConn(st) {
880		return
881	}
882	defer s.removeConn(st)
883	s.serveStreams(st)
884}
885
886// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
887// If tracing is not enabled, it returns nil.
888func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
889	if !EnableTracing {
890		return nil
891	}
892	tr, ok := trace.FromContext(stream.Context())
893	if !ok {
894		return nil
895	}
896
897	trInfo = &traceInfo{
898		tr: tr,
899		firstLine: firstLine{
900			client:     false,
901			remoteAddr: st.RemoteAddr(),
902		},
903	}
904	if dl, ok := stream.Context().Deadline(); ok {
905		trInfo.firstLine.deadline = time.Until(dl)
906	}
907	return trInfo
908}
909
910func (s *Server) addConn(st transport.ServerTransport) bool {
911	s.mu.Lock()
912	defer s.mu.Unlock()
913	if s.conns == nil {
914		st.Close()
915		return false
916	}
917	if s.drain {
918		// Transport added after we drained our existing conns: drain it
919		// immediately.
920		st.Drain()
921	}
922	s.conns[st] = true
923	return true
924}
925
926func (s *Server) removeConn(st transport.ServerTransport) {
927	s.mu.Lock()
928	defer s.mu.Unlock()
929	if s.conns != nil {
930		delete(s.conns, st)
931		s.cv.Broadcast()
932	}
933}
934
935func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
936	return &channelz.ServerInternalMetric{
937		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
938		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
939		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
940		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
941	}
942}
943
944func (s *Server) incrCallsStarted() {
945	atomic.AddInt64(&s.czData.callsStarted, 1)
946	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
947}
948
949func (s *Server) incrCallsSucceeded() {
950	atomic.AddInt64(&s.czData.callsSucceeded, 1)
951}
952
953func (s *Server) incrCallsFailed() {
954	atomic.AddInt64(&s.czData.callsFailed, 1)
955}
956
957func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
958	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
959	if err != nil {
960		channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
961		return err
962	}
963	compData, err := compress(data, cp, comp)
964	if err != nil {
965		channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
966		return err
967	}
968	hdr, payload := msgHeader(data, compData)
969	// TODO(dfawley): should we be checking len(data) instead?
970	if len(payload) > s.opts.maxSendMessageSize {
971		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
972	}
973	err = t.Write(stream, hdr, payload, opts)
974	if err == nil && s.opts.statsHandler != nil {
975		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
976	}
977	return err
978}
979
980// chainUnaryServerInterceptors chains all unary server interceptors into one.
981func chainUnaryServerInterceptors(s *Server) {
982	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
983	// be executed before any other chained interceptors.
984	interceptors := s.opts.chainUnaryInts
985	if s.opts.unaryInt != nil {
986		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
987	}
988
989	var chainedInt UnaryServerInterceptor
990	if len(interceptors) == 0 {
991		chainedInt = nil
992	} else if len(interceptors) == 1 {
993		chainedInt = interceptors[0]
994	} else {
995		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
996			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
997		}
998	}
999
1000	s.opts.unaryInt = chainedInt
1001}
1002
1003// getChainUnaryHandler recursively generate the chained UnaryHandler
1004func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1005	if curr == len(interceptors)-1 {
1006		return finalHandler
1007	}
1008
1009	return func(ctx context.Context, req interface{}) (interface{}, error) {
1010		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1011	}
1012}
1013
1014func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
1015	sh := s.opts.statsHandler
1016	if sh != nil || trInfo != nil || channelz.IsOn() {
1017		if channelz.IsOn() {
1018			s.incrCallsStarted()
1019		}
1020		var statsBegin *stats.Begin
1021		if sh != nil {
1022			beginTime := time.Now()
1023			statsBegin = &stats.Begin{
1024				BeginTime: beginTime,
1025			}
1026			sh.HandleRPC(stream.Context(), statsBegin)
1027		}
1028		if trInfo != nil {
1029			trInfo.tr.LazyLog(&trInfo.firstLine, false)
1030		}
1031		// The deferred error handling for tracing, stats handler and channelz are
1032		// combined into one function to reduce stack usage -- a defer takes ~56-64
1033		// bytes on the stack, so overflowing the stack will require a stack
1034		// re-allocation, which is expensive.
1035		//
1036		// To maintain behavior similar to separate deferred statements, statements
1037		// should be executed in the reverse order. That is, tracing first, stats
1038		// handler second, and channelz last. Note that panics *within* defers will
1039		// lead to different behavior, but that's an acceptable compromise; that
1040		// would be undefined behavior territory anyway.
1041		defer func() {
1042			if trInfo != nil {
1043				if err != nil && err != io.EOF {
1044					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1045					trInfo.tr.SetError()
1046				}
1047				trInfo.tr.Finish()
1048			}
1049
1050			if sh != nil {
1051				end := &stats.End{
1052					BeginTime: statsBegin.BeginTime,
1053					EndTime:   time.Now(),
1054				}
1055				if err != nil && err != io.EOF {
1056					end.Error = toRPCErr(err)
1057				}
1058				sh.HandleRPC(stream.Context(), end)
1059			}
1060
1061			if channelz.IsOn() {
1062				if err != nil && err != io.EOF {
1063					s.incrCallsFailed()
1064				} else {
1065					s.incrCallsSucceeded()
1066				}
1067			}
1068		}()
1069	}
1070
1071	binlog := binarylog.GetMethodLogger(stream.Method())
1072	if binlog != nil {
1073		ctx := stream.Context()
1074		md, _ := metadata.FromIncomingContext(ctx)
1075		logEntry := &binarylog.ClientHeader{
1076			Header:     md,
1077			MethodName: stream.Method(),
1078			PeerAddr:   nil,
1079		}
1080		if deadline, ok := ctx.Deadline(); ok {
1081			logEntry.Timeout = time.Until(deadline)
1082			if logEntry.Timeout < 0 {
1083				logEntry.Timeout = 0
1084			}
1085		}
1086		if a := md[":authority"]; len(a) > 0 {
1087			logEntry.Authority = a[0]
1088		}
1089		if peer, ok := peer.FromContext(ctx); ok {
1090			logEntry.PeerAddr = peer.Addr
1091		}
1092		binlog.Log(logEntry)
1093	}
1094
1095	// comp and cp are used for compression.  decomp and dc are used for
1096	// decompression.  If comp and decomp are both set, they are the same;
1097	// however they are kept separate to ensure that at most one of the
1098	// compressor/decompressor variable pairs are set for use later.
1099	var comp, decomp encoding.Compressor
1100	var cp Compressor
1101	var dc Decompressor
1102
1103	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1104	// to find a matching registered compressor for decomp.
1105	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1106		dc = s.opts.dc
1107	} else if rc != "" && rc != encoding.Identity {
1108		decomp = encoding.GetCompressor(rc)
1109		if decomp == nil {
1110			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1111			t.WriteStatus(stream, st)
1112			return st.Err()
1113		}
1114	}
1115
1116	// If cp is set, use it.  Otherwise, attempt to compress the response using
1117	// the incoming message compression method.
1118	//
1119	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1120	if s.opts.cp != nil {
1121		cp = s.opts.cp
1122		stream.SetSendCompress(cp.Type())
1123	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1124		// Legacy compressor not specified; attempt to respond with same encoding.
1125		comp = encoding.GetCompressor(rc)
1126		if comp != nil {
1127			stream.SetSendCompress(rc)
1128		}
1129	}
1130
1131	var payInfo *payloadInfo
1132	if sh != nil || binlog != nil {
1133		payInfo = &payloadInfo{}
1134	}
1135	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1136	if err != nil {
1137		if st, ok := status.FromError(err); ok {
1138			if e := t.WriteStatus(stream, st); e != nil {
1139				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1140			}
1141		}
1142		return err
1143	}
1144	if channelz.IsOn() {
1145		t.IncrMsgRecv()
1146	}
1147	df := func(v interface{}) error {
1148		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1149			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1150		}
1151		if sh != nil {
1152			sh.HandleRPC(stream.Context(), &stats.InPayload{
1153				RecvTime:   time.Now(),
1154				Payload:    v,
1155				WireLength: payInfo.wireLength,
1156				Data:       d,
1157				Length:     len(d),
1158			})
1159		}
1160		if binlog != nil {
1161			binlog.Log(&binarylog.ClientMessage{
1162				Message: d,
1163			})
1164		}
1165		if trInfo != nil {
1166			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1167		}
1168		return nil
1169	}
1170	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1171	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1172	if appErr != nil {
1173		appStatus, ok := status.FromError(appErr)
1174		if !ok {
1175			// Convert appErr if it is not a grpc status error.
1176			appErr = status.Error(codes.Unknown, appErr.Error())
1177			appStatus, _ = status.FromError(appErr)
1178		}
1179		if trInfo != nil {
1180			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1181			trInfo.tr.SetError()
1182		}
1183		if e := t.WriteStatus(stream, appStatus); e != nil {
1184			channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1185		}
1186		if binlog != nil {
1187			if h, _ := stream.Header(); h.Len() > 0 {
1188				// Only log serverHeader if there was header. Otherwise it can
1189				// be trailer only.
1190				binlog.Log(&binarylog.ServerHeader{
1191					Header: h,
1192				})
1193			}
1194			binlog.Log(&binarylog.ServerTrailer{
1195				Trailer: stream.Trailer(),
1196				Err:     appErr,
1197			})
1198		}
1199		return appErr
1200	}
1201	if trInfo != nil {
1202		trInfo.tr.LazyLog(stringer("OK"), false)
1203	}
1204	opts := &transport.Options{Last: true}
1205
1206	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1207		if err == io.EOF {
1208			// The entire stream is done (for unary RPC only).
1209			return err
1210		}
1211		if sts, ok := status.FromError(err); ok {
1212			if e := t.WriteStatus(stream, sts); e != nil {
1213				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1214			}
1215		} else {
1216			switch st := err.(type) {
1217			case transport.ConnectionError:
1218				// Nothing to do here.
1219			default:
1220				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1221			}
1222		}
1223		if binlog != nil {
1224			h, _ := stream.Header()
1225			binlog.Log(&binarylog.ServerHeader{
1226				Header: h,
1227			})
1228			binlog.Log(&binarylog.ServerTrailer{
1229				Trailer: stream.Trailer(),
1230				Err:     appErr,
1231			})
1232		}
1233		return err
1234	}
1235	if binlog != nil {
1236		h, _ := stream.Header()
1237		binlog.Log(&binarylog.ServerHeader{
1238			Header: h,
1239		})
1240		binlog.Log(&binarylog.ServerMessage{
1241			Message: reply,
1242		})
1243	}
1244	if channelz.IsOn() {
1245		t.IncrMsgSent()
1246	}
1247	if trInfo != nil {
1248		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1249	}
1250	// TODO: Should we be logging if writing status failed here, like above?
1251	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1252	// error or allow the stats handler to see it?
1253	err = t.WriteStatus(stream, statusOK)
1254	if binlog != nil {
1255		binlog.Log(&binarylog.ServerTrailer{
1256			Trailer: stream.Trailer(),
1257			Err:     appErr,
1258		})
1259	}
1260	return err
1261}
1262
1263// chainStreamServerInterceptors chains all stream server interceptors into one.
1264func chainStreamServerInterceptors(s *Server) {
1265	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1266	// be executed before any other chained interceptors.
1267	interceptors := s.opts.chainStreamInts
1268	if s.opts.streamInt != nil {
1269		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1270	}
1271
1272	var chainedInt StreamServerInterceptor
1273	if len(interceptors) == 0 {
1274		chainedInt = nil
1275	} else if len(interceptors) == 1 {
1276		chainedInt = interceptors[0]
1277	} else {
1278		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1279			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1280		}
1281	}
1282
1283	s.opts.streamInt = chainedInt
1284}
1285
1286// getChainStreamHandler recursively generate the chained StreamHandler
1287func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1288	if curr == len(interceptors)-1 {
1289		return finalHandler
1290	}
1291
1292	return func(srv interface{}, ss ServerStream) error {
1293		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1294	}
1295}
1296
1297func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1298	if channelz.IsOn() {
1299		s.incrCallsStarted()
1300	}
1301	sh := s.opts.statsHandler
1302	var statsBegin *stats.Begin
1303	if sh != nil {
1304		beginTime := time.Now()
1305		statsBegin = &stats.Begin{
1306			BeginTime: beginTime,
1307		}
1308		sh.HandleRPC(stream.Context(), statsBegin)
1309	}
1310	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1311	ss := &serverStream{
1312		ctx:                   ctx,
1313		t:                     t,
1314		s:                     stream,
1315		p:                     &parser{r: stream},
1316		codec:                 s.getCodec(stream.ContentSubtype()),
1317		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1318		maxSendMessageSize:    s.opts.maxSendMessageSize,
1319		trInfo:                trInfo,
1320		statsHandler:          sh,
1321	}
1322
1323	if sh != nil || trInfo != nil || channelz.IsOn() {
1324		// See comment in processUnaryRPC on defers.
1325		defer func() {
1326			if trInfo != nil {
1327				ss.mu.Lock()
1328				if err != nil && err != io.EOF {
1329					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1330					ss.trInfo.tr.SetError()
1331				}
1332				ss.trInfo.tr.Finish()
1333				ss.trInfo.tr = nil
1334				ss.mu.Unlock()
1335			}
1336
1337			if sh != nil {
1338				end := &stats.End{
1339					BeginTime: statsBegin.BeginTime,
1340					EndTime:   time.Now(),
1341				}
1342				if err != nil && err != io.EOF {
1343					end.Error = toRPCErr(err)
1344				}
1345				sh.HandleRPC(stream.Context(), end)
1346			}
1347
1348			if channelz.IsOn() {
1349				if err != nil && err != io.EOF {
1350					s.incrCallsFailed()
1351				} else {
1352					s.incrCallsSucceeded()
1353				}
1354			}
1355		}()
1356	}
1357
1358	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1359	if ss.binlog != nil {
1360		md, _ := metadata.FromIncomingContext(ctx)
1361		logEntry := &binarylog.ClientHeader{
1362			Header:     md,
1363			MethodName: stream.Method(),
1364			PeerAddr:   nil,
1365		}
1366		if deadline, ok := ctx.Deadline(); ok {
1367			logEntry.Timeout = time.Until(deadline)
1368			if logEntry.Timeout < 0 {
1369				logEntry.Timeout = 0
1370			}
1371		}
1372		if a := md[":authority"]; len(a) > 0 {
1373			logEntry.Authority = a[0]
1374		}
1375		if peer, ok := peer.FromContext(ss.Context()); ok {
1376			logEntry.PeerAddr = peer.Addr
1377		}
1378		ss.binlog.Log(logEntry)
1379	}
1380
1381	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1382	// to find a matching registered compressor for decomp.
1383	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1384		ss.dc = s.opts.dc
1385	} else if rc != "" && rc != encoding.Identity {
1386		ss.decomp = encoding.GetCompressor(rc)
1387		if ss.decomp == nil {
1388			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1389			t.WriteStatus(ss.s, st)
1390			return st.Err()
1391		}
1392	}
1393
1394	// If cp is set, use it.  Otherwise, attempt to compress the response using
1395	// the incoming message compression method.
1396	//
1397	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1398	if s.opts.cp != nil {
1399		ss.cp = s.opts.cp
1400		stream.SetSendCompress(s.opts.cp.Type())
1401	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1402		// Legacy compressor not specified; attempt to respond with same encoding.
1403		ss.comp = encoding.GetCompressor(rc)
1404		if ss.comp != nil {
1405			stream.SetSendCompress(rc)
1406		}
1407	}
1408
1409	if trInfo != nil {
1410		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1411	}
1412	var appErr error
1413	var server interface{}
1414	if srv != nil {
1415		server = srv.server
1416	}
1417	if s.opts.streamInt == nil {
1418		appErr = sd.Handler(server, ss)
1419	} else {
1420		info := &StreamServerInfo{
1421			FullMethod:     stream.Method(),
1422			IsClientStream: sd.ClientStreams,
1423			IsServerStream: sd.ServerStreams,
1424		}
1425		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1426	}
1427	if appErr != nil {
1428		appStatus, ok := status.FromError(appErr)
1429		if !ok {
1430			appStatus = status.New(codes.Unknown, appErr.Error())
1431			appErr = appStatus.Err()
1432		}
1433		if trInfo != nil {
1434			ss.mu.Lock()
1435			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1436			ss.trInfo.tr.SetError()
1437			ss.mu.Unlock()
1438		}
1439		t.WriteStatus(ss.s, appStatus)
1440		if ss.binlog != nil {
1441			ss.binlog.Log(&binarylog.ServerTrailer{
1442				Trailer: ss.s.Trailer(),
1443				Err:     appErr,
1444			})
1445		}
1446		// TODO: Should we log an error from WriteStatus here and below?
1447		return appErr
1448	}
1449	if trInfo != nil {
1450		ss.mu.Lock()
1451		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1452		ss.mu.Unlock()
1453	}
1454	err = t.WriteStatus(ss.s, statusOK)
1455	if ss.binlog != nil {
1456		ss.binlog.Log(&binarylog.ServerTrailer{
1457			Trailer: ss.s.Trailer(),
1458			Err:     appErr,
1459		})
1460	}
1461	return err
1462}
1463
1464func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1465	sm := stream.Method()
1466	if sm != "" && sm[0] == '/' {
1467		sm = sm[1:]
1468	}
1469	pos := strings.LastIndex(sm, "/")
1470	if pos == -1 {
1471		if trInfo != nil {
1472			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1473			trInfo.tr.SetError()
1474		}
1475		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1476		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1477			if trInfo != nil {
1478				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1479				trInfo.tr.SetError()
1480			}
1481			channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1482		}
1483		if trInfo != nil {
1484			trInfo.tr.Finish()
1485		}
1486		return
1487	}
1488	service := sm[:pos]
1489	method := sm[pos+1:]
1490
1491	srv, knownService := s.m[service]
1492	if knownService {
1493		if md, ok := srv.md[method]; ok {
1494			s.processUnaryRPC(t, stream, srv, md, trInfo)
1495			return
1496		}
1497		if sd, ok := srv.sd[method]; ok {
1498			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1499			return
1500		}
1501	}
1502	// Unknown service, or known server unknown method.
1503	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1504		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1505		return
1506	}
1507	var errDesc string
1508	if !knownService {
1509		errDesc = fmt.Sprintf("unknown service %v", service)
1510	} else {
1511		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1512	}
1513	if trInfo != nil {
1514		trInfo.tr.LazyPrintf("%s", errDesc)
1515		trInfo.tr.SetError()
1516	}
1517	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1518		if trInfo != nil {
1519			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1520			trInfo.tr.SetError()
1521		}
1522		channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1523	}
1524	if trInfo != nil {
1525		trInfo.tr.Finish()
1526	}
1527}
1528
1529// The key to save ServerTransportStream in the context.
1530type streamKey struct{}
1531
1532// NewContextWithServerTransportStream creates a new context from ctx and
1533// attaches stream to it.
1534//
1535// This API is EXPERIMENTAL.
1536func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1537	return context.WithValue(ctx, streamKey{}, stream)
1538}
1539
1540// ServerTransportStream is a minimal interface that a transport stream must
1541// implement. This can be used to mock an actual transport stream for tests of
1542// handler code that use, for example, grpc.SetHeader (which requires some
1543// stream to be in context).
1544//
1545// See also NewContextWithServerTransportStream.
1546//
1547// This API is EXPERIMENTAL.
1548type ServerTransportStream interface {
1549	Method() string
1550	SetHeader(md metadata.MD) error
1551	SendHeader(md metadata.MD) error
1552	SetTrailer(md metadata.MD) error
1553}
1554
1555// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1556// ctx. Returns nil if the given context has no stream associated with it
1557// (which implies it is not an RPC invocation context).
1558//
1559// This API is EXPERIMENTAL.
1560func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1561	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1562	return s
1563}
1564
1565// Stop stops the gRPC server. It immediately closes all open
1566// connections and listeners.
1567// It cancels all active RPCs on the server side and the corresponding
1568// pending RPCs on the client side will get notified by connection
1569// errors.
1570func (s *Server) Stop() {
1571	s.quit.Fire()
1572
1573	defer func() {
1574		s.serveWG.Wait()
1575		s.done.Fire()
1576	}()
1577
1578	s.channelzRemoveOnce.Do(func() {
1579		if channelz.IsOn() {
1580			channelz.RemoveEntry(s.channelzID)
1581		}
1582	})
1583
1584	s.mu.Lock()
1585	listeners := s.lis
1586	s.lis = nil
1587	st := s.conns
1588	s.conns = nil
1589	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1590	s.cv.Broadcast()
1591	s.mu.Unlock()
1592
1593	for lis := range listeners {
1594		lis.Close()
1595	}
1596	for c := range st {
1597		c.Close()
1598	}
1599	if s.opts.numServerWorkers > 0 {
1600		s.stopServerWorkers()
1601	}
1602
1603	s.mu.Lock()
1604	if s.events != nil {
1605		s.events.Finish()
1606		s.events = nil
1607	}
1608	s.mu.Unlock()
1609}
1610
1611// GracefulStop stops the gRPC server gracefully. It stops the server from
1612// accepting new connections and RPCs and blocks until all the pending RPCs are
1613// finished.
1614func (s *Server) GracefulStop() {
1615	s.quit.Fire()
1616	defer s.done.Fire()
1617
1618	s.channelzRemoveOnce.Do(func() {
1619		if channelz.IsOn() {
1620			channelz.RemoveEntry(s.channelzID)
1621		}
1622	})
1623	s.mu.Lock()
1624	if s.conns == nil {
1625		s.mu.Unlock()
1626		return
1627	}
1628
1629	for lis := range s.lis {
1630		lis.Close()
1631	}
1632	s.lis = nil
1633	if !s.drain {
1634		for st := range s.conns {
1635			st.Drain()
1636		}
1637		s.drain = true
1638	}
1639
1640	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1641	// new conns will be created.
1642	s.mu.Unlock()
1643	s.serveWG.Wait()
1644	s.mu.Lock()
1645
1646	for len(s.conns) != 0 {
1647		s.cv.Wait()
1648	}
1649	s.conns = nil
1650	if s.events != nil {
1651		s.events.Finish()
1652		s.events = nil
1653	}
1654	s.mu.Unlock()
1655}
1656
1657// contentSubtype must be lowercase
1658// cannot return nil
1659func (s *Server) getCodec(contentSubtype string) baseCodec {
1660	if s.opts.codec != nil {
1661		return s.opts.codec
1662	}
1663	if contentSubtype == "" {
1664		return encoding.GetCodec(proto.Name)
1665	}
1666	codec := encoding.GetCodec(contentSubtype)
1667	if codec == nil {
1668		return encoding.GetCodec(proto.Name)
1669	}
1670	return codec
1671}
1672
1673// SetHeader sets the header metadata.
1674// When called multiple times, all the provided metadata will be merged.
1675// All the metadata will be sent out when one of the following happens:
1676//  - grpc.SendHeader() is called;
1677//  - The first response is sent out;
1678//  - An RPC status is sent out (error or success).
1679func SetHeader(ctx context.Context, md metadata.MD) error {
1680	if md.Len() == 0 {
1681		return nil
1682	}
1683	stream := ServerTransportStreamFromContext(ctx)
1684	if stream == nil {
1685		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1686	}
1687	return stream.SetHeader(md)
1688}
1689
1690// SendHeader sends header metadata. It may be called at most once.
1691// The provided md and headers set by SetHeader() will be sent.
1692func SendHeader(ctx context.Context, md metadata.MD) error {
1693	stream := ServerTransportStreamFromContext(ctx)
1694	if stream == nil {
1695		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1696	}
1697	if err := stream.SendHeader(md); err != nil {
1698		return toRPCErr(err)
1699	}
1700	return nil
1701}
1702
1703// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1704// When called more than once, all the provided metadata will be merged.
1705func SetTrailer(ctx context.Context, md metadata.MD) error {
1706	if md.Len() == 0 {
1707		return nil
1708	}
1709	stream := ServerTransportStreamFromContext(ctx)
1710	if stream == nil {
1711		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1712	}
1713	return stream.SetTrailer(md)
1714}
1715
1716// Method returns the method string for the server context.  The returned
1717// string is in the format of "/service/method".
1718func Method(ctx context.Context) (string, bool) {
1719	s := ServerTransportStreamFromContext(ctx)
1720	if s == nil {
1721		return "", false
1722	}
1723	return s.Method(), true
1724}
1725
1726type channelzServer struct {
1727	s *Server
1728}
1729
1730func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1731	return c.s.channelzMetric()
1732}
1733