1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"sync/atomic"
34	"time"
35
36	"golang.org/x/net/trace"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/encoding"
41	"google.golang.org/grpc/encoding/proto"
42	"google.golang.org/grpc/grpclog"
43	"google.golang.org/grpc/internal"
44	"google.golang.org/grpc/internal/binarylog"
45	"google.golang.org/grpc/internal/channelz"
46	"google.golang.org/grpc/internal/grpcrand"
47	"google.golang.org/grpc/internal/grpcsync"
48	"google.golang.org/grpc/internal/transport"
49	"google.golang.org/grpc/keepalive"
50	"google.golang.org/grpc/metadata"
51	"google.golang.org/grpc/peer"
52	"google.golang.org/grpc/stats"
53	"google.golang.org/grpc/status"
54	"google.golang.org/grpc/tap"
55)
56
57const (
58	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
59	defaultServerMaxSendMessageSize    = math.MaxInt32
60
61	// Server transports are tracked in a map which is keyed on listener
62	// address. For regular gRPC traffic, connections are accepted in Serve()
63	// through a call to Accept(), and we use the actual listener address as key
64	// when we add it to the map. But for connections received through
65	// ServeHTTP(), we do not have a listener and hence use this dummy value.
66	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
67)
68
69func init() {
70	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
71		return srv.opts.creds
72	}
73	internal.DrainServerTransports = func(srv *Server, addr string) {
74		srv.drainServerTransports(addr)
75	}
76}
77
78var statusOK = status.New(codes.OK, "")
79var logger = grpclog.Component("core")
80
81type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
82
83// MethodDesc represents an RPC service's method specification.
84type MethodDesc struct {
85	MethodName string
86	Handler    methodHandler
87}
88
89// ServiceDesc represents an RPC service's specification.
90type ServiceDesc struct {
91	ServiceName string
92	// The pointer to the service interface. Used to check whether the user
93	// provided implementation satisfies the interface requirements.
94	HandlerType interface{}
95	Methods     []MethodDesc
96	Streams     []StreamDesc
97	Metadata    interface{}
98}
99
100// serviceInfo wraps information about a service. It is very similar to
101// ServiceDesc and is constructed from it for internal purposes.
102type serviceInfo struct {
103	// Contains the implementation for the methods in this service.
104	serviceImpl interface{}
105	methods     map[string]*MethodDesc
106	streams     map[string]*StreamDesc
107	mdata       interface{}
108}
109
110type serverWorkerData struct {
111	st     transport.ServerTransport
112	wg     *sync.WaitGroup
113	stream *transport.Stream
114}
115
116// Server is a gRPC server to serve RPC requests.
117type Server struct {
118	opts serverOptions
119
120	mu  sync.Mutex // guards following
121	lis map[net.Listener]bool
122	// conns contains all active server transports. It is a map keyed on a
123	// listener address with the value being the set of active transports
124	// belonging to that listener.
125	conns    map[string]map[transport.ServerTransport]bool
126	serve    bool
127	drain    bool
128	cv       *sync.Cond              // signaled when connections close for GracefulStop
129	services map[string]*serviceInfo // service name -> service info
130	events   trace.EventLog
131
132	quit               *grpcsync.Event
133	done               *grpcsync.Event
134	channelzRemoveOnce sync.Once
135	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
136
137	channelzID int64 // channelz unique identification number
138	czData     *channelzData
139
140	serverWorkerChannels []chan *serverWorkerData
141}
142
143type serverOptions struct {
144	creds                 credentials.TransportCredentials
145	codec                 baseCodec
146	cp                    Compressor
147	dc                    Decompressor
148	unaryInt              UnaryServerInterceptor
149	streamInt             StreamServerInterceptor
150	chainUnaryInts        []UnaryServerInterceptor
151	chainStreamInts       []StreamServerInterceptor
152	inTapHandle           tap.ServerInHandle
153	statsHandler          stats.Handler
154	maxConcurrentStreams  uint32
155	maxReceiveMessageSize int
156	maxSendMessageSize    int
157	unknownStreamDesc     *StreamDesc
158	keepaliveParams       keepalive.ServerParameters
159	keepalivePolicy       keepalive.EnforcementPolicy
160	initialWindowSize     int32
161	initialConnWindowSize int32
162	writeBufferSize       int
163	readBufferSize        int
164	connectionTimeout     time.Duration
165	maxHeaderListSize     *uint32
166	headerTableSize       *uint32
167	numServerWorkers      uint32
168}
169
170var defaultServerOptions = serverOptions{
171	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
172	maxSendMessageSize:    defaultServerMaxSendMessageSize,
173	connectionTimeout:     120 * time.Second,
174	writeBufferSize:       defaultWriteBufSize,
175	readBufferSize:        defaultReadBufSize,
176}
177
178// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
179type ServerOption interface {
180	apply(*serverOptions)
181}
182
183// EmptyServerOption does not alter the server configuration. It can be embedded
184// in another structure to build custom server options.
185//
186// Experimental
187//
188// Notice: This type is EXPERIMENTAL and may be changed or removed in a
189// later release.
190type EmptyServerOption struct{}
191
192func (EmptyServerOption) apply(*serverOptions) {}
193
194// funcServerOption wraps a function that modifies serverOptions into an
195// implementation of the ServerOption interface.
196type funcServerOption struct {
197	f func(*serverOptions)
198}
199
200func (fdo *funcServerOption) apply(do *serverOptions) {
201	fdo.f(do)
202}
203
204func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
205	return &funcServerOption{
206		f: f,
207	}
208}
209
210// WriteBufferSize determines how much data can be batched before doing a write on the wire.
211// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
212// The default value for this buffer is 32KB.
213// Zero will disable the write buffer such that each write will be on underlying connection.
214// Note: A Send call may not directly translate to a write.
215func WriteBufferSize(s int) ServerOption {
216	return newFuncServerOption(func(o *serverOptions) {
217		o.writeBufferSize = s
218	})
219}
220
221// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
222// for one read syscall.
223// The default value for this buffer is 32KB.
224// Zero will disable read buffer for a connection so data framer can access the underlying
225// conn directly.
226func ReadBufferSize(s int) ServerOption {
227	return newFuncServerOption(func(o *serverOptions) {
228		o.readBufferSize = s
229	})
230}
231
232// InitialWindowSize returns a ServerOption that sets window size for stream.
233// The lower bound for window size is 64K and any value smaller than that will be ignored.
234func InitialWindowSize(s int32) ServerOption {
235	return newFuncServerOption(func(o *serverOptions) {
236		o.initialWindowSize = s
237	})
238}
239
240// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
241// The lower bound for window size is 64K and any value smaller than that will be ignored.
242func InitialConnWindowSize(s int32) ServerOption {
243	return newFuncServerOption(func(o *serverOptions) {
244		o.initialConnWindowSize = s
245	})
246}
247
248// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
249func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
250	if kp.Time > 0 && kp.Time < time.Second {
251		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
252		kp.Time = time.Second
253	}
254
255	return newFuncServerOption(func(o *serverOptions) {
256		o.keepaliveParams = kp
257	})
258}
259
260// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
261func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
262	return newFuncServerOption(func(o *serverOptions) {
263		o.keepalivePolicy = kep
264	})
265}
266
267// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
268//
269// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
270//
271// Deprecated: register codecs using encoding.RegisterCodec. The server will
272// automatically use registered codecs based on the incoming requests' headers.
273// See also
274// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
275// Will be supported throughout 1.x.
276func CustomCodec(codec Codec) ServerOption {
277	return newFuncServerOption(func(o *serverOptions) {
278		o.codec = codec
279	})
280}
281
282// ForceServerCodec returns a ServerOption that sets a codec for message
283// marshaling and unmarshaling.
284//
285// This will override any lookups by content-subtype for Codecs registered
286// with RegisterCodec.
287//
288// See Content-Type on
289// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
290// more details. Also see the documentation on RegisterCodec and
291// CallContentSubtype for more details on the interaction between encoding.Codec
292// and content-subtype.
293//
294// This function is provided for advanced users; prefer to register codecs
295// using encoding.RegisterCodec.
296// The server will automatically use registered codecs based on the incoming
297// requests' headers. See also
298// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
299// Will be supported throughout 1.x.
300//
301// Experimental
302//
303// Notice: This API is EXPERIMENTAL and may be changed or removed in a
304// later release.
305func ForceServerCodec(codec encoding.Codec) ServerOption {
306	return newFuncServerOption(func(o *serverOptions) {
307		o.codec = codec
308	})
309}
310
311// RPCCompressor returns a ServerOption that sets a compressor for outbound
312// messages.  For backward compatibility, all outbound messages will be sent
313// using this compressor, regardless of incoming message compression.  By
314// default, server messages will be sent using the same compressor with which
315// request messages were sent.
316//
317// Deprecated: use encoding.RegisterCompressor instead. Will be supported
318// throughout 1.x.
319func RPCCompressor(cp Compressor) ServerOption {
320	return newFuncServerOption(func(o *serverOptions) {
321		o.cp = cp
322	})
323}
324
325// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
326// messages.  It has higher priority than decompressors registered via
327// encoding.RegisterCompressor.
328//
329// Deprecated: use encoding.RegisterCompressor instead. Will be supported
330// throughout 1.x.
331func RPCDecompressor(dc Decompressor) ServerOption {
332	return newFuncServerOption(func(o *serverOptions) {
333		o.dc = dc
334	})
335}
336
337// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
338// If this is not set, gRPC uses the default limit.
339//
340// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
341func MaxMsgSize(m int) ServerOption {
342	return MaxRecvMsgSize(m)
343}
344
345// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
346// If this is not set, gRPC uses the default 4MB.
347func MaxRecvMsgSize(m int) ServerOption {
348	return newFuncServerOption(func(o *serverOptions) {
349		o.maxReceiveMessageSize = m
350	})
351}
352
353// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
354// If this is not set, gRPC uses the default `math.MaxInt32`.
355func MaxSendMsgSize(m int) ServerOption {
356	return newFuncServerOption(func(o *serverOptions) {
357		o.maxSendMessageSize = m
358	})
359}
360
361// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
362// of concurrent streams to each ServerTransport.
363func MaxConcurrentStreams(n uint32) ServerOption {
364	return newFuncServerOption(func(o *serverOptions) {
365		o.maxConcurrentStreams = n
366	})
367}
368
369// Creds returns a ServerOption that sets credentials for server connections.
370func Creds(c credentials.TransportCredentials) ServerOption {
371	return newFuncServerOption(func(o *serverOptions) {
372		o.creds = c
373	})
374}
375
376// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
377// server. Only one unary interceptor can be installed. The construction of multiple
378// interceptors (e.g., chaining) can be implemented at the caller.
379func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
380	return newFuncServerOption(func(o *serverOptions) {
381		if o.unaryInt != nil {
382			panic("The unary server interceptor was already set and may not be reset.")
383		}
384		o.unaryInt = i
385	})
386}
387
388// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
389// for unary RPCs. The first interceptor will be the outer most,
390// while the last interceptor will be the inner most wrapper around the real call.
391// All unary interceptors added by this method will be chained.
392func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
393	return newFuncServerOption(func(o *serverOptions) {
394		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
395	})
396}
397
398// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
399// server. Only one stream interceptor can be installed.
400func StreamInterceptor(i StreamServerInterceptor) ServerOption {
401	return newFuncServerOption(func(o *serverOptions) {
402		if o.streamInt != nil {
403			panic("The stream server interceptor was already set and may not be reset.")
404		}
405		o.streamInt = i
406	})
407}
408
409// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
410// for streaming RPCs. The first interceptor will be the outer most,
411// while the last interceptor will be the inner most wrapper around the real call.
412// All stream interceptors added by this method will be chained.
413func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
414	return newFuncServerOption(func(o *serverOptions) {
415		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
416	})
417}
418
419// InTapHandle returns a ServerOption that sets the tap handle for all the server
420// transport to be created. Only one can be installed.
421//
422// Experimental
423//
424// Notice: This API is EXPERIMENTAL and may be changed or removed in a
425// later release.
426func InTapHandle(h tap.ServerInHandle) ServerOption {
427	return newFuncServerOption(func(o *serverOptions) {
428		if o.inTapHandle != nil {
429			panic("The tap handle was already set and may not be reset.")
430		}
431		o.inTapHandle = h
432	})
433}
434
435// StatsHandler returns a ServerOption that sets the stats handler for the server.
436func StatsHandler(h stats.Handler) ServerOption {
437	return newFuncServerOption(func(o *serverOptions) {
438		o.statsHandler = h
439	})
440}
441
442// UnknownServiceHandler returns a ServerOption that allows for adding a custom
443// unknown service handler. The provided method is a bidi-streaming RPC service
444// handler that will be invoked instead of returning the "unimplemented" gRPC
445// error whenever a request is received for an unregistered service or method.
446// The handling function and stream interceptor (if set) have full access to
447// the ServerStream, including its Context.
448func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
449	return newFuncServerOption(func(o *serverOptions) {
450		o.unknownStreamDesc = &StreamDesc{
451			StreamName: "unknown_service_handler",
452			Handler:    streamHandler,
453			// We need to assume that the users of the streamHandler will want to use both.
454			ClientStreams: true,
455			ServerStreams: true,
456		}
457	})
458}
459
460// ConnectionTimeout returns a ServerOption that sets the timeout for
461// connection establishment (up to and including HTTP/2 handshaking) for all
462// new connections.  If this is not set, the default is 120 seconds.  A zero or
463// negative value will result in an immediate timeout.
464//
465// Experimental
466//
467// Notice: This API is EXPERIMENTAL and may be changed or removed in a
468// later release.
469func ConnectionTimeout(d time.Duration) ServerOption {
470	return newFuncServerOption(func(o *serverOptions) {
471		o.connectionTimeout = d
472	})
473}
474
475// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
476// of header list that the server is prepared to accept.
477func MaxHeaderListSize(s uint32) ServerOption {
478	return newFuncServerOption(func(o *serverOptions) {
479		o.maxHeaderListSize = &s
480	})
481}
482
483// HeaderTableSize returns a ServerOption that sets the size of dynamic
484// header table for stream.
485//
486// Experimental
487//
488// Notice: This API is EXPERIMENTAL and may be changed or removed in a
489// later release.
490func HeaderTableSize(s uint32) ServerOption {
491	return newFuncServerOption(func(o *serverOptions) {
492		o.headerTableSize = &s
493	})
494}
495
496// NumStreamWorkers returns a ServerOption that sets the number of worker
497// goroutines that should be used to process incoming streams. Setting this to
498// zero (default) will disable workers and spawn a new goroutine for each
499// stream.
500//
501// Experimental
502//
503// Notice: This API is EXPERIMENTAL and may be changed or removed in a
504// later release.
505func NumStreamWorkers(numServerWorkers uint32) ServerOption {
506	// TODO: If/when this API gets stabilized (i.e. stream workers become the
507	// only way streams are processed), change the behavior of the zero value to
508	// a sane default. Preliminary experiments suggest that a value equal to the
509	// number of CPUs available is most performant; requires thorough testing.
510	return newFuncServerOption(func(o *serverOptions) {
511		o.numServerWorkers = numServerWorkers
512	})
513}
514
515// serverWorkerResetThreshold defines how often the stack must be reset. Every
516// N requests, by spawning a new goroutine in its place, a worker can reset its
517// stack so that large stacks don't live in memory forever. 2^16 should allow
518// each goroutine stack to live for at least a few seconds in a typical
519// workload (assuming a QPS of a few thousand requests/sec).
520const serverWorkerResetThreshold = 1 << 16
521
522// serverWorkers blocks on a *transport.Stream channel forever and waits for
523// data to be fed by serveStreams. This allows different requests to be
524// processed by the same goroutine, removing the need for expensive stack
525// re-allocations (see the runtime.morestack problem [1]).
526//
527// [1] https://github.com/golang/go/issues/18138
528func (s *Server) serverWorker(ch chan *serverWorkerData) {
529	// To make sure all server workers don't reset at the same time, choose a
530	// random number of iterations before resetting.
531	threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
532	for completed := 0; completed < threshold; completed++ {
533		data, ok := <-ch
534		if !ok {
535			return
536		}
537		s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
538		data.wg.Done()
539	}
540	go s.serverWorker(ch)
541}
542
543// initServerWorkers creates worker goroutines and channels to process incoming
544// connections to reduce the time spent overall on runtime.morestack.
545func (s *Server) initServerWorkers() {
546	s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
547	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
548		s.serverWorkerChannels[i] = make(chan *serverWorkerData)
549		go s.serverWorker(s.serverWorkerChannels[i])
550	}
551}
552
553func (s *Server) stopServerWorkers() {
554	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
555		close(s.serverWorkerChannels[i])
556	}
557}
558
559// NewServer creates a gRPC server which has no service registered and has not
560// started to accept requests yet.
561func NewServer(opt ...ServerOption) *Server {
562	opts := defaultServerOptions
563	for _, o := range opt {
564		o.apply(&opts)
565	}
566	s := &Server{
567		lis:      make(map[net.Listener]bool),
568		opts:     opts,
569		conns:    make(map[string]map[transport.ServerTransport]bool),
570		services: make(map[string]*serviceInfo),
571		quit:     grpcsync.NewEvent(),
572		done:     grpcsync.NewEvent(),
573		czData:   new(channelzData),
574	}
575	chainUnaryServerInterceptors(s)
576	chainStreamServerInterceptors(s)
577	s.cv = sync.NewCond(&s.mu)
578	if EnableTracing {
579		_, file, line, _ := runtime.Caller(1)
580		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
581	}
582
583	if s.opts.numServerWorkers > 0 {
584		s.initServerWorkers()
585	}
586
587	if channelz.IsOn() {
588		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
589	}
590	return s
591}
592
593// printf records an event in s's event log, unless s has been stopped.
594// REQUIRES s.mu is held.
595func (s *Server) printf(format string, a ...interface{}) {
596	if s.events != nil {
597		s.events.Printf(format, a...)
598	}
599}
600
601// errorf records an error in s's event log, unless s has been stopped.
602// REQUIRES s.mu is held.
603func (s *Server) errorf(format string, a ...interface{}) {
604	if s.events != nil {
605		s.events.Errorf(format, a...)
606	}
607}
608
609// ServiceRegistrar wraps a single method that supports service registration. It
610// enables users to pass concrete types other than grpc.Server to the service
611// registration methods exported by the IDL generated code.
612type ServiceRegistrar interface {
613	// RegisterService registers a service and its implementation to the
614	// concrete type implementing this interface.  It may not be called
615	// once the server has started serving.
616	// desc describes the service and its methods and handlers. impl is the
617	// service implementation which is passed to the method handlers.
618	RegisterService(desc *ServiceDesc, impl interface{})
619}
620
621// RegisterService registers a service and its implementation to the gRPC
622// server. It is called from the IDL generated code. This must be called before
623// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
624// ensure it implements sd.HandlerType.
625func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
626	if ss != nil {
627		ht := reflect.TypeOf(sd.HandlerType).Elem()
628		st := reflect.TypeOf(ss)
629		if !st.Implements(ht) {
630			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
631		}
632	}
633	s.register(sd, ss)
634}
635
636func (s *Server) register(sd *ServiceDesc, ss interface{}) {
637	s.mu.Lock()
638	defer s.mu.Unlock()
639	s.printf("RegisterService(%q)", sd.ServiceName)
640	if s.serve {
641		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
642	}
643	if _, ok := s.services[sd.ServiceName]; ok {
644		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
645	}
646	info := &serviceInfo{
647		serviceImpl: ss,
648		methods:     make(map[string]*MethodDesc),
649		streams:     make(map[string]*StreamDesc),
650		mdata:       sd.Metadata,
651	}
652	for i := range sd.Methods {
653		d := &sd.Methods[i]
654		info.methods[d.MethodName] = d
655	}
656	for i := range sd.Streams {
657		d := &sd.Streams[i]
658		info.streams[d.StreamName] = d
659	}
660	s.services[sd.ServiceName] = info
661}
662
663// MethodInfo contains the information of an RPC including its method name and type.
664type MethodInfo struct {
665	// Name is the method name only, without the service name or package name.
666	Name string
667	// IsClientStream indicates whether the RPC is a client streaming RPC.
668	IsClientStream bool
669	// IsServerStream indicates whether the RPC is a server streaming RPC.
670	IsServerStream bool
671}
672
673// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
674type ServiceInfo struct {
675	Methods []MethodInfo
676	// Metadata is the metadata specified in ServiceDesc when registering service.
677	Metadata interface{}
678}
679
680// GetServiceInfo returns a map from service names to ServiceInfo.
681// Service names include the package names, in the form of <package>.<service>.
682func (s *Server) GetServiceInfo() map[string]ServiceInfo {
683	ret := make(map[string]ServiceInfo)
684	for n, srv := range s.services {
685		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
686		for m := range srv.methods {
687			methods = append(methods, MethodInfo{
688				Name:           m,
689				IsClientStream: false,
690				IsServerStream: false,
691			})
692		}
693		for m, d := range srv.streams {
694			methods = append(methods, MethodInfo{
695				Name:           m,
696				IsClientStream: d.ClientStreams,
697				IsServerStream: d.ServerStreams,
698			})
699		}
700
701		ret[n] = ServiceInfo{
702			Methods:  methods,
703			Metadata: srv.mdata,
704		}
705	}
706	return ret
707}
708
709// ErrServerStopped indicates that the operation is now illegal because of
710// the server being stopped.
711var ErrServerStopped = errors.New("grpc: the server has been stopped")
712
713func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
714	if s.opts.creds == nil {
715		return rawConn, nil, nil
716	}
717	return s.opts.creds.ServerHandshake(rawConn)
718}
719
720type listenSocket struct {
721	net.Listener
722	channelzID int64
723}
724
725func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
726	return &channelz.SocketInternalMetric{
727		SocketOptions: channelz.GetSocketOption(l.Listener),
728		LocalAddr:     l.Listener.Addr(),
729	}
730}
731
732func (l *listenSocket) Close() error {
733	err := l.Listener.Close()
734	if channelz.IsOn() {
735		channelz.RemoveEntry(l.channelzID)
736	}
737	return err
738}
739
740// Serve accepts incoming connections on the listener lis, creating a new
741// ServerTransport and service goroutine for each. The service goroutines
742// read gRPC requests and then call the registered handlers to reply to them.
743// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
744// this method returns.
745// Serve will return a non-nil error unless Stop or GracefulStop is called.
746func (s *Server) Serve(lis net.Listener) error {
747	s.mu.Lock()
748	s.printf("serving")
749	s.serve = true
750	if s.lis == nil {
751		// Serve called after Stop or GracefulStop.
752		s.mu.Unlock()
753		lis.Close()
754		return ErrServerStopped
755	}
756
757	s.serveWG.Add(1)
758	defer func() {
759		s.serveWG.Done()
760		if s.quit.HasFired() {
761			// Stop or GracefulStop called; block until done and return nil.
762			<-s.done.Done()
763		}
764	}()
765
766	ls := &listenSocket{Listener: lis}
767	s.lis[ls] = true
768
769	if channelz.IsOn() {
770		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
771	}
772	s.mu.Unlock()
773
774	defer func() {
775		s.mu.Lock()
776		if s.lis != nil && s.lis[ls] {
777			ls.Close()
778			delete(s.lis, ls)
779		}
780		s.mu.Unlock()
781	}()
782
783	var tempDelay time.Duration // how long to sleep on accept failure
784
785	for {
786		rawConn, err := lis.Accept()
787		if err != nil {
788			if ne, ok := err.(interface {
789				Temporary() bool
790			}); ok && ne.Temporary() {
791				if tempDelay == 0 {
792					tempDelay = 5 * time.Millisecond
793				} else {
794					tempDelay *= 2
795				}
796				if max := 1 * time.Second; tempDelay > max {
797					tempDelay = max
798				}
799				s.mu.Lock()
800				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
801				s.mu.Unlock()
802				timer := time.NewTimer(tempDelay)
803				select {
804				case <-timer.C:
805				case <-s.quit.Done():
806					timer.Stop()
807					return nil
808				}
809				continue
810			}
811			s.mu.Lock()
812			s.printf("done serving; Accept = %v", err)
813			s.mu.Unlock()
814
815			if s.quit.HasFired() {
816				return nil
817			}
818			return err
819		}
820		tempDelay = 0
821		// Start a new goroutine to deal with rawConn so we don't stall this Accept
822		// loop goroutine.
823		//
824		// Make sure we account for the goroutine so GracefulStop doesn't nil out
825		// s.conns before this conn can be added.
826		s.serveWG.Add(1)
827		go func() {
828			s.handleRawConn(lis.Addr().String(), rawConn)
829			s.serveWG.Done()
830		}()
831	}
832}
833
834// handleRawConn forks a goroutine to handle a just-accepted connection that
835// has not had any I/O performed on it yet.
836func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
837	if s.quit.HasFired() {
838		rawConn.Close()
839		return
840	}
841	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
842	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
843	if err != nil {
844		// ErrConnDispatched means that the connection was dispatched away from
845		// gRPC; those connections should be left open.
846		if err != credentials.ErrConnDispatched {
847			s.mu.Lock()
848			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
849			s.mu.Unlock()
850			channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
851			rawConn.Close()
852		}
853		rawConn.SetDeadline(time.Time{})
854		return
855	}
856
857	// Finish handshaking (HTTP2)
858	st := s.newHTTP2Transport(conn, authInfo)
859	if st == nil {
860		return
861	}
862
863	rawConn.SetDeadline(time.Time{})
864	if !s.addConn(lisAddr, st) {
865		return
866	}
867	go func() {
868		s.serveStreams(st)
869		s.removeConn(lisAddr, st)
870	}()
871}
872
873func (s *Server) drainServerTransports(addr string) {
874	s.mu.Lock()
875	conns := s.conns[addr]
876	for st := range conns {
877		st.Drain()
878	}
879	s.mu.Unlock()
880}
881
882// newHTTP2Transport sets up a http/2 transport (using the
883// gRPC http2 server transport in transport/http2_server.go).
884func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
885	config := &transport.ServerConfig{
886		MaxStreams:            s.opts.maxConcurrentStreams,
887		AuthInfo:              authInfo,
888		InTapHandle:           s.opts.inTapHandle,
889		StatsHandler:          s.opts.statsHandler,
890		KeepaliveParams:       s.opts.keepaliveParams,
891		KeepalivePolicy:       s.opts.keepalivePolicy,
892		InitialWindowSize:     s.opts.initialWindowSize,
893		InitialConnWindowSize: s.opts.initialConnWindowSize,
894		WriteBufferSize:       s.opts.writeBufferSize,
895		ReadBufferSize:        s.opts.readBufferSize,
896		ChannelzParentID:      s.channelzID,
897		MaxHeaderListSize:     s.opts.maxHeaderListSize,
898		HeaderTableSize:       s.opts.headerTableSize,
899	}
900	st, err := transport.NewServerTransport("http2", c, config)
901	if err != nil {
902		s.mu.Lock()
903		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
904		s.mu.Unlock()
905		c.Close()
906		channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
907		return nil
908	}
909
910	return st
911}
912
913func (s *Server) serveStreams(st transport.ServerTransport) {
914	defer st.Close()
915	var wg sync.WaitGroup
916
917	var roundRobinCounter uint32
918	st.HandleStreams(func(stream *transport.Stream) {
919		wg.Add(1)
920		if s.opts.numServerWorkers > 0 {
921			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
922			select {
923			case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
924			default:
925				// If all stream workers are busy, fallback to the default code path.
926				go func() {
927					s.handleStream(st, stream, s.traceInfo(st, stream))
928					wg.Done()
929				}()
930			}
931		} else {
932			go func() {
933				defer wg.Done()
934				s.handleStream(st, stream, s.traceInfo(st, stream))
935			}()
936		}
937	}, func(ctx context.Context, method string) context.Context {
938		if !EnableTracing {
939			return ctx
940		}
941		tr := trace.New("grpc.Recv."+methodFamily(method), method)
942		return trace.NewContext(ctx, tr)
943	})
944	wg.Wait()
945}
946
947var _ http.Handler = (*Server)(nil)
948
949// ServeHTTP implements the Go standard library's http.Handler
950// interface by responding to the gRPC request r, by looking up
951// the requested gRPC method in the gRPC server s.
952//
953// The provided HTTP request must have arrived on an HTTP/2
954// connection. When using the Go standard library's server,
955// practically this means that the Request must also have arrived
956// over TLS.
957//
958// To share one port (such as 443 for https) between gRPC and an
959// existing http.Handler, use a root http.Handler such as:
960//
961//   if r.ProtoMajor == 2 && strings.HasPrefix(
962//   	r.Header.Get("Content-Type"), "application/grpc") {
963//   	grpcServer.ServeHTTP(w, r)
964//   } else {
965//   	yourMux.ServeHTTP(w, r)
966//   }
967//
968// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
969// separate from grpc-go's HTTP/2 server. Performance and features may vary
970// between the two paths. ServeHTTP does not support some gRPC features
971// available through grpc-go's HTTP/2 server.
972//
973// Experimental
974//
975// Notice: This API is EXPERIMENTAL and may be changed or removed in a
976// later release.
977func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
978	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
979	if err != nil {
980		http.Error(w, err.Error(), http.StatusInternalServerError)
981		return
982	}
983	if !s.addConn(listenerAddressForServeHTTP, st) {
984		return
985	}
986	defer s.removeConn(listenerAddressForServeHTTP, st)
987	s.serveStreams(st)
988}
989
990// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
991// If tracing is not enabled, it returns nil.
992func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
993	if !EnableTracing {
994		return nil
995	}
996	tr, ok := trace.FromContext(stream.Context())
997	if !ok {
998		return nil
999	}
1000
1001	trInfo = &traceInfo{
1002		tr: tr,
1003		firstLine: firstLine{
1004			client:     false,
1005			remoteAddr: st.RemoteAddr(),
1006		},
1007	}
1008	if dl, ok := stream.Context().Deadline(); ok {
1009		trInfo.firstLine.deadline = time.Until(dl)
1010	}
1011	return trInfo
1012}
1013
1014func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1015	s.mu.Lock()
1016	defer s.mu.Unlock()
1017	if s.conns == nil {
1018		st.Close()
1019		return false
1020	}
1021	if s.drain {
1022		// Transport added after we drained our existing conns: drain it
1023		// immediately.
1024		st.Drain()
1025	}
1026
1027	if s.conns[addr] == nil {
1028		// Create a map entry if this is the first connection on this listener.
1029		s.conns[addr] = make(map[transport.ServerTransport]bool)
1030	}
1031	s.conns[addr][st] = true
1032	return true
1033}
1034
1035func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1036	s.mu.Lock()
1037	defer s.mu.Unlock()
1038
1039	conns := s.conns[addr]
1040	if conns != nil {
1041		delete(conns, st)
1042		if len(conns) == 0 {
1043			// If the last connection for this address is being removed, also
1044			// remove the map entry corresponding to the address. This is used
1045			// in GracefulStop() when waiting for all connections to be closed.
1046			delete(s.conns, addr)
1047		}
1048		s.cv.Broadcast()
1049	}
1050}
1051
1052func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
1053	return &channelz.ServerInternalMetric{
1054		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
1055		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
1056		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
1057		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
1058	}
1059}
1060
1061func (s *Server) incrCallsStarted() {
1062	atomic.AddInt64(&s.czData.callsStarted, 1)
1063	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
1064}
1065
1066func (s *Server) incrCallsSucceeded() {
1067	atomic.AddInt64(&s.czData.callsSucceeded, 1)
1068}
1069
1070func (s *Server) incrCallsFailed() {
1071	atomic.AddInt64(&s.czData.callsFailed, 1)
1072}
1073
1074func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1075	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1076	if err != nil {
1077		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1078		return err
1079	}
1080	compData, err := compress(data, cp, comp)
1081	if err != nil {
1082		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1083		return err
1084	}
1085	hdr, payload := msgHeader(data, compData)
1086	// TODO(dfawley): should we be checking len(data) instead?
1087	if len(payload) > s.opts.maxSendMessageSize {
1088		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1089	}
1090	err = t.Write(stream, hdr, payload, opts)
1091	if err == nil && s.opts.statsHandler != nil {
1092		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
1093	}
1094	return err
1095}
1096
1097// chainUnaryServerInterceptors chains all unary server interceptors into one.
1098func chainUnaryServerInterceptors(s *Server) {
1099	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1100	// be executed before any other chained interceptors.
1101	interceptors := s.opts.chainUnaryInts
1102	if s.opts.unaryInt != nil {
1103		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1104	}
1105
1106	var chainedInt UnaryServerInterceptor
1107	if len(interceptors) == 0 {
1108		chainedInt = nil
1109	} else if len(interceptors) == 1 {
1110		chainedInt = interceptors[0]
1111	} else {
1112		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
1113			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1114		}
1115	}
1116
1117	s.opts.unaryInt = chainedInt
1118}
1119
1120// getChainUnaryHandler recursively generate the chained UnaryHandler
1121func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1122	if curr == len(interceptors)-1 {
1123		return finalHandler
1124	}
1125
1126	return func(ctx context.Context, req interface{}) (interface{}, error) {
1127		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
1128	}
1129}
1130
1131func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1132	sh := s.opts.statsHandler
1133	if sh != nil || trInfo != nil || channelz.IsOn() {
1134		if channelz.IsOn() {
1135			s.incrCallsStarted()
1136		}
1137		var statsBegin *stats.Begin
1138		if sh != nil {
1139			beginTime := time.Now()
1140			statsBegin = &stats.Begin{
1141				BeginTime: beginTime,
1142			}
1143			sh.HandleRPC(stream.Context(), statsBegin)
1144		}
1145		if trInfo != nil {
1146			trInfo.tr.LazyLog(&trInfo.firstLine, false)
1147		}
1148		// The deferred error handling for tracing, stats handler and channelz are
1149		// combined into one function to reduce stack usage -- a defer takes ~56-64
1150		// bytes on the stack, so overflowing the stack will require a stack
1151		// re-allocation, which is expensive.
1152		//
1153		// To maintain behavior similar to separate deferred statements, statements
1154		// should be executed in the reverse order. That is, tracing first, stats
1155		// handler second, and channelz last. Note that panics *within* defers will
1156		// lead to different behavior, but that's an acceptable compromise; that
1157		// would be undefined behavior territory anyway.
1158		defer func() {
1159			if trInfo != nil {
1160				if err != nil && err != io.EOF {
1161					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1162					trInfo.tr.SetError()
1163				}
1164				trInfo.tr.Finish()
1165			}
1166
1167			if sh != nil {
1168				end := &stats.End{
1169					BeginTime: statsBegin.BeginTime,
1170					EndTime:   time.Now(),
1171				}
1172				if err != nil && err != io.EOF {
1173					end.Error = toRPCErr(err)
1174				}
1175				sh.HandleRPC(stream.Context(), end)
1176			}
1177
1178			if channelz.IsOn() {
1179				if err != nil && err != io.EOF {
1180					s.incrCallsFailed()
1181				} else {
1182					s.incrCallsSucceeded()
1183				}
1184			}
1185		}()
1186	}
1187
1188	binlog := binarylog.GetMethodLogger(stream.Method())
1189	if binlog != nil {
1190		ctx := stream.Context()
1191		md, _ := metadata.FromIncomingContext(ctx)
1192		logEntry := &binarylog.ClientHeader{
1193			Header:     md,
1194			MethodName: stream.Method(),
1195			PeerAddr:   nil,
1196		}
1197		if deadline, ok := ctx.Deadline(); ok {
1198			logEntry.Timeout = time.Until(deadline)
1199			if logEntry.Timeout < 0 {
1200				logEntry.Timeout = 0
1201			}
1202		}
1203		if a := md[":authority"]; len(a) > 0 {
1204			logEntry.Authority = a[0]
1205		}
1206		if peer, ok := peer.FromContext(ctx); ok {
1207			logEntry.PeerAddr = peer.Addr
1208		}
1209		binlog.Log(logEntry)
1210	}
1211
1212	// comp and cp are used for compression.  decomp and dc are used for
1213	// decompression.  If comp and decomp are both set, they are the same;
1214	// however they are kept separate to ensure that at most one of the
1215	// compressor/decompressor variable pairs are set for use later.
1216	var comp, decomp encoding.Compressor
1217	var cp Compressor
1218	var dc Decompressor
1219
1220	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1221	// to find a matching registered compressor for decomp.
1222	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1223		dc = s.opts.dc
1224	} else if rc != "" && rc != encoding.Identity {
1225		decomp = encoding.GetCompressor(rc)
1226		if decomp == nil {
1227			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1228			t.WriteStatus(stream, st)
1229			return st.Err()
1230		}
1231	}
1232
1233	// If cp is set, use it.  Otherwise, attempt to compress the response using
1234	// the incoming message compression method.
1235	//
1236	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1237	if s.opts.cp != nil {
1238		cp = s.opts.cp
1239		stream.SetSendCompress(cp.Type())
1240	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1241		// Legacy compressor not specified; attempt to respond with same encoding.
1242		comp = encoding.GetCompressor(rc)
1243		if comp != nil {
1244			stream.SetSendCompress(rc)
1245		}
1246	}
1247
1248	var payInfo *payloadInfo
1249	if sh != nil || binlog != nil {
1250		payInfo = &payloadInfo{}
1251	}
1252	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1253	if err != nil {
1254		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1255			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1256		}
1257		return err
1258	}
1259	if channelz.IsOn() {
1260		t.IncrMsgRecv()
1261	}
1262	df := func(v interface{}) error {
1263		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1264			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1265		}
1266		if sh != nil {
1267			sh.HandleRPC(stream.Context(), &stats.InPayload{
1268				RecvTime:   time.Now(),
1269				Payload:    v,
1270				WireLength: payInfo.wireLength + headerLen,
1271				Data:       d,
1272				Length:     len(d),
1273			})
1274		}
1275		if binlog != nil {
1276			binlog.Log(&binarylog.ClientMessage{
1277				Message: d,
1278			})
1279		}
1280		if trInfo != nil {
1281			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1282		}
1283		return nil
1284	}
1285	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1286	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1287	if appErr != nil {
1288		appStatus, ok := status.FromError(appErr)
1289		if !ok {
1290			// Convert appErr if it is not a grpc status error.
1291			appErr = status.Error(codes.Unknown, appErr.Error())
1292			appStatus, _ = status.FromError(appErr)
1293		}
1294		if trInfo != nil {
1295			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1296			trInfo.tr.SetError()
1297		}
1298		if e := t.WriteStatus(stream, appStatus); e != nil {
1299			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1300		}
1301		if binlog != nil {
1302			if h, _ := stream.Header(); h.Len() > 0 {
1303				// Only log serverHeader if there was header. Otherwise it can
1304				// be trailer only.
1305				binlog.Log(&binarylog.ServerHeader{
1306					Header: h,
1307				})
1308			}
1309			binlog.Log(&binarylog.ServerTrailer{
1310				Trailer: stream.Trailer(),
1311				Err:     appErr,
1312			})
1313		}
1314		return appErr
1315	}
1316	if trInfo != nil {
1317		trInfo.tr.LazyLog(stringer("OK"), false)
1318	}
1319	opts := &transport.Options{Last: true}
1320
1321	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1322		if err == io.EOF {
1323			// The entire stream is done (for unary RPC only).
1324			return err
1325		}
1326		if sts, ok := status.FromError(err); ok {
1327			if e := t.WriteStatus(stream, sts); e != nil {
1328				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1329			}
1330		} else {
1331			switch st := err.(type) {
1332			case transport.ConnectionError:
1333				// Nothing to do here.
1334			default:
1335				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1336			}
1337		}
1338		if binlog != nil {
1339			h, _ := stream.Header()
1340			binlog.Log(&binarylog.ServerHeader{
1341				Header: h,
1342			})
1343			binlog.Log(&binarylog.ServerTrailer{
1344				Trailer: stream.Trailer(),
1345				Err:     appErr,
1346			})
1347		}
1348		return err
1349	}
1350	if binlog != nil {
1351		h, _ := stream.Header()
1352		binlog.Log(&binarylog.ServerHeader{
1353			Header: h,
1354		})
1355		binlog.Log(&binarylog.ServerMessage{
1356			Message: reply,
1357		})
1358	}
1359	if channelz.IsOn() {
1360		t.IncrMsgSent()
1361	}
1362	if trInfo != nil {
1363		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1364	}
1365	// TODO: Should we be logging if writing status failed here, like above?
1366	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1367	// error or allow the stats handler to see it?
1368	err = t.WriteStatus(stream, statusOK)
1369	if binlog != nil {
1370		binlog.Log(&binarylog.ServerTrailer{
1371			Trailer: stream.Trailer(),
1372			Err:     appErr,
1373		})
1374	}
1375	return err
1376}
1377
1378// chainStreamServerInterceptors chains all stream server interceptors into one.
1379func chainStreamServerInterceptors(s *Server) {
1380	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1381	// be executed before any other chained interceptors.
1382	interceptors := s.opts.chainStreamInts
1383	if s.opts.streamInt != nil {
1384		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1385	}
1386
1387	var chainedInt StreamServerInterceptor
1388	if len(interceptors) == 0 {
1389		chainedInt = nil
1390	} else if len(interceptors) == 1 {
1391		chainedInt = interceptors[0]
1392	} else {
1393		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1394			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1395		}
1396	}
1397
1398	s.opts.streamInt = chainedInt
1399}
1400
1401// getChainStreamHandler recursively generate the chained StreamHandler
1402func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1403	if curr == len(interceptors)-1 {
1404		return finalHandler
1405	}
1406
1407	return func(srv interface{}, ss ServerStream) error {
1408		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1409	}
1410}
1411
1412func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1413	if channelz.IsOn() {
1414		s.incrCallsStarted()
1415	}
1416	sh := s.opts.statsHandler
1417	var statsBegin *stats.Begin
1418	if sh != nil {
1419		beginTime := time.Now()
1420		statsBegin = &stats.Begin{
1421			BeginTime: beginTime,
1422		}
1423		sh.HandleRPC(stream.Context(), statsBegin)
1424	}
1425	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1426	ss := &serverStream{
1427		ctx:                   ctx,
1428		t:                     t,
1429		s:                     stream,
1430		p:                     &parser{r: stream},
1431		codec:                 s.getCodec(stream.ContentSubtype()),
1432		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1433		maxSendMessageSize:    s.opts.maxSendMessageSize,
1434		trInfo:                trInfo,
1435		statsHandler:          sh,
1436	}
1437
1438	if sh != nil || trInfo != nil || channelz.IsOn() {
1439		// See comment in processUnaryRPC on defers.
1440		defer func() {
1441			if trInfo != nil {
1442				ss.mu.Lock()
1443				if err != nil && err != io.EOF {
1444					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1445					ss.trInfo.tr.SetError()
1446				}
1447				ss.trInfo.tr.Finish()
1448				ss.trInfo.tr = nil
1449				ss.mu.Unlock()
1450			}
1451
1452			if sh != nil {
1453				end := &stats.End{
1454					BeginTime: statsBegin.BeginTime,
1455					EndTime:   time.Now(),
1456				}
1457				if err != nil && err != io.EOF {
1458					end.Error = toRPCErr(err)
1459				}
1460				sh.HandleRPC(stream.Context(), end)
1461			}
1462
1463			if channelz.IsOn() {
1464				if err != nil && err != io.EOF {
1465					s.incrCallsFailed()
1466				} else {
1467					s.incrCallsSucceeded()
1468				}
1469			}
1470		}()
1471	}
1472
1473	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1474	if ss.binlog != nil {
1475		md, _ := metadata.FromIncomingContext(ctx)
1476		logEntry := &binarylog.ClientHeader{
1477			Header:     md,
1478			MethodName: stream.Method(),
1479			PeerAddr:   nil,
1480		}
1481		if deadline, ok := ctx.Deadline(); ok {
1482			logEntry.Timeout = time.Until(deadline)
1483			if logEntry.Timeout < 0 {
1484				logEntry.Timeout = 0
1485			}
1486		}
1487		if a := md[":authority"]; len(a) > 0 {
1488			logEntry.Authority = a[0]
1489		}
1490		if peer, ok := peer.FromContext(ss.Context()); ok {
1491			logEntry.PeerAddr = peer.Addr
1492		}
1493		ss.binlog.Log(logEntry)
1494	}
1495
1496	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1497	// to find a matching registered compressor for decomp.
1498	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1499		ss.dc = s.opts.dc
1500	} else if rc != "" && rc != encoding.Identity {
1501		ss.decomp = encoding.GetCompressor(rc)
1502		if ss.decomp == nil {
1503			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1504			t.WriteStatus(ss.s, st)
1505			return st.Err()
1506		}
1507	}
1508
1509	// If cp is set, use it.  Otherwise, attempt to compress the response using
1510	// the incoming message compression method.
1511	//
1512	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1513	if s.opts.cp != nil {
1514		ss.cp = s.opts.cp
1515		stream.SetSendCompress(s.opts.cp.Type())
1516	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1517		// Legacy compressor not specified; attempt to respond with same encoding.
1518		ss.comp = encoding.GetCompressor(rc)
1519		if ss.comp != nil {
1520			stream.SetSendCompress(rc)
1521		}
1522	}
1523
1524	if trInfo != nil {
1525		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1526	}
1527	var appErr error
1528	var server interface{}
1529	if info != nil {
1530		server = info.serviceImpl
1531	}
1532	if s.opts.streamInt == nil {
1533		appErr = sd.Handler(server, ss)
1534	} else {
1535		info := &StreamServerInfo{
1536			FullMethod:     stream.Method(),
1537			IsClientStream: sd.ClientStreams,
1538			IsServerStream: sd.ServerStreams,
1539		}
1540		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1541	}
1542	if appErr != nil {
1543		appStatus, ok := status.FromError(appErr)
1544		if !ok {
1545			appStatus = status.New(codes.Unknown, appErr.Error())
1546			appErr = appStatus.Err()
1547		}
1548		if trInfo != nil {
1549			ss.mu.Lock()
1550			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1551			ss.trInfo.tr.SetError()
1552			ss.mu.Unlock()
1553		}
1554		t.WriteStatus(ss.s, appStatus)
1555		if ss.binlog != nil {
1556			ss.binlog.Log(&binarylog.ServerTrailer{
1557				Trailer: ss.s.Trailer(),
1558				Err:     appErr,
1559			})
1560		}
1561		// TODO: Should we log an error from WriteStatus here and below?
1562		return appErr
1563	}
1564	if trInfo != nil {
1565		ss.mu.Lock()
1566		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1567		ss.mu.Unlock()
1568	}
1569	err = t.WriteStatus(ss.s, statusOK)
1570	if ss.binlog != nil {
1571		ss.binlog.Log(&binarylog.ServerTrailer{
1572			Trailer: ss.s.Trailer(),
1573			Err:     appErr,
1574		})
1575	}
1576	return err
1577}
1578
1579func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1580	sm := stream.Method()
1581	if sm != "" && sm[0] == '/' {
1582		sm = sm[1:]
1583	}
1584	pos := strings.LastIndex(sm, "/")
1585	if pos == -1 {
1586		if trInfo != nil {
1587			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1588			trInfo.tr.SetError()
1589		}
1590		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1591		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1592			if trInfo != nil {
1593				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1594				trInfo.tr.SetError()
1595			}
1596			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1597		}
1598		if trInfo != nil {
1599			trInfo.tr.Finish()
1600		}
1601		return
1602	}
1603	service := sm[:pos]
1604	method := sm[pos+1:]
1605
1606	srv, knownService := s.services[service]
1607	if knownService {
1608		if md, ok := srv.methods[method]; ok {
1609			s.processUnaryRPC(t, stream, srv, md, trInfo)
1610			return
1611		}
1612		if sd, ok := srv.streams[method]; ok {
1613			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1614			return
1615		}
1616	}
1617	// Unknown service, or known server unknown method.
1618	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1619		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1620		return
1621	}
1622	var errDesc string
1623	if !knownService {
1624		errDesc = fmt.Sprintf("unknown service %v", service)
1625	} else {
1626		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1627	}
1628	if trInfo != nil {
1629		trInfo.tr.LazyPrintf("%s", errDesc)
1630		trInfo.tr.SetError()
1631	}
1632	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1633		if trInfo != nil {
1634			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1635			trInfo.tr.SetError()
1636		}
1637		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1638	}
1639	if trInfo != nil {
1640		trInfo.tr.Finish()
1641	}
1642}
1643
1644// The key to save ServerTransportStream in the context.
1645type streamKey struct{}
1646
1647// NewContextWithServerTransportStream creates a new context from ctx and
1648// attaches stream to it.
1649//
1650// Experimental
1651//
1652// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1653// later release.
1654func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1655	return context.WithValue(ctx, streamKey{}, stream)
1656}
1657
1658// ServerTransportStream is a minimal interface that a transport stream must
1659// implement. This can be used to mock an actual transport stream for tests of
1660// handler code that use, for example, grpc.SetHeader (which requires some
1661// stream to be in context).
1662//
1663// See also NewContextWithServerTransportStream.
1664//
1665// Experimental
1666//
1667// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1668// later release.
1669type ServerTransportStream interface {
1670	Method() string
1671	SetHeader(md metadata.MD) error
1672	SendHeader(md metadata.MD) error
1673	SetTrailer(md metadata.MD) error
1674}
1675
1676// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1677// ctx. Returns nil if the given context has no stream associated with it
1678// (which implies it is not an RPC invocation context).
1679//
1680// Experimental
1681//
1682// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1683// later release.
1684func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1685	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1686	return s
1687}
1688
1689// Stop stops the gRPC server. It immediately closes all open
1690// connections and listeners.
1691// It cancels all active RPCs on the server side and the corresponding
1692// pending RPCs on the client side will get notified by connection
1693// errors.
1694func (s *Server) Stop() {
1695	s.quit.Fire()
1696
1697	defer func() {
1698		s.serveWG.Wait()
1699		s.done.Fire()
1700	}()
1701
1702	s.channelzRemoveOnce.Do(func() {
1703		if channelz.IsOn() {
1704			channelz.RemoveEntry(s.channelzID)
1705		}
1706	})
1707
1708	s.mu.Lock()
1709	listeners := s.lis
1710	s.lis = nil
1711	conns := s.conns
1712	s.conns = nil
1713	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1714	s.cv.Broadcast()
1715	s.mu.Unlock()
1716
1717	for lis := range listeners {
1718		lis.Close()
1719	}
1720	for _, cs := range conns {
1721		for st := range cs {
1722			st.Close()
1723		}
1724	}
1725	if s.opts.numServerWorkers > 0 {
1726		s.stopServerWorkers()
1727	}
1728
1729	s.mu.Lock()
1730	if s.events != nil {
1731		s.events.Finish()
1732		s.events = nil
1733	}
1734	s.mu.Unlock()
1735}
1736
1737// GracefulStop stops the gRPC server gracefully. It stops the server from
1738// accepting new connections and RPCs and blocks until all the pending RPCs are
1739// finished.
1740func (s *Server) GracefulStop() {
1741	s.quit.Fire()
1742	defer s.done.Fire()
1743
1744	s.channelzRemoveOnce.Do(func() {
1745		if channelz.IsOn() {
1746			channelz.RemoveEntry(s.channelzID)
1747		}
1748	})
1749	s.mu.Lock()
1750	if s.conns == nil {
1751		s.mu.Unlock()
1752		return
1753	}
1754
1755	for lis := range s.lis {
1756		lis.Close()
1757	}
1758	s.lis = nil
1759	if !s.drain {
1760		for _, conns := range s.conns {
1761			for st := range conns {
1762				st.Drain()
1763			}
1764		}
1765		s.drain = true
1766	}
1767
1768	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1769	// new conns will be created.
1770	s.mu.Unlock()
1771	s.serveWG.Wait()
1772	s.mu.Lock()
1773
1774	for len(s.conns) != 0 {
1775		s.cv.Wait()
1776	}
1777	s.conns = nil
1778	if s.events != nil {
1779		s.events.Finish()
1780		s.events = nil
1781	}
1782	s.mu.Unlock()
1783}
1784
1785// contentSubtype must be lowercase
1786// cannot return nil
1787func (s *Server) getCodec(contentSubtype string) baseCodec {
1788	if s.opts.codec != nil {
1789		return s.opts.codec
1790	}
1791	if contentSubtype == "" {
1792		return encoding.GetCodec(proto.Name)
1793	}
1794	codec := encoding.GetCodec(contentSubtype)
1795	if codec == nil {
1796		return encoding.GetCodec(proto.Name)
1797	}
1798	return codec
1799}
1800
1801// SetHeader sets the header metadata.
1802// When called multiple times, all the provided metadata will be merged.
1803// All the metadata will be sent out when one of the following happens:
1804//  - grpc.SendHeader() is called;
1805//  - The first response is sent out;
1806//  - An RPC status is sent out (error or success).
1807func SetHeader(ctx context.Context, md metadata.MD) error {
1808	if md.Len() == 0 {
1809		return nil
1810	}
1811	stream := ServerTransportStreamFromContext(ctx)
1812	if stream == nil {
1813		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1814	}
1815	return stream.SetHeader(md)
1816}
1817
1818// SendHeader sends header metadata. It may be called at most once.
1819// The provided md and headers set by SetHeader() will be sent.
1820func SendHeader(ctx context.Context, md metadata.MD) error {
1821	stream := ServerTransportStreamFromContext(ctx)
1822	if stream == nil {
1823		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1824	}
1825	if err := stream.SendHeader(md); err != nil {
1826		return toRPCErr(err)
1827	}
1828	return nil
1829}
1830
1831// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1832// When called more than once, all the provided metadata will be merged.
1833func SetTrailer(ctx context.Context, md metadata.MD) error {
1834	if md.Len() == 0 {
1835		return nil
1836	}
1837	stream := ServerTransportStreamFromContext(ctx)
1838	if stream == nil {
1839		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1840	}
1841	return stream.SetTrailer(md)
1842}
1843
1844// Method returns the method string for the server context.  The returned
1845// string is in the format of "/service/method".
1846func Method(ctx context.Context) (string, bool) {
1847	s := ServerTransportStreamFromContext(ctx)
1848	if s == nil {
1849		return "", false
1850	}
1851	return s.Method(), true
1852}
1853
1854type channelzServer struct {
1855	s *Server
1856}
1857
1858func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1859	return c.s.channelzMetric()
1860}
1861