1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"sync/atomic"
34	"time"
35
36	"golang.org/x/net/trace"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/encoding"
41	"google.golang.org/grpc/encoding/proto"
42	"google.golang.org/grpc/grpclog"
43	"google.golang.org/grpc/internal/binarylog"
44	"google.golang.org/grpc/internal/channelz"
45	"google.golang.org/grpc/internal/grpcsync"
46	"google.golang.org/grpc/internal/transport"
47	"google.golang.org/grpc/keepalive"
48	"google.golang.org/grpc/metadata"
49	"google.golang.org/grpc/peer"
50	"google.golang.org/grpc/stats"
51	"google.golang.org/grpc/status"
52	"google.golang.org/grpc/tap"
53)
54
55const (
56	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57	defaultServerMaxSendMessageSize    = math.MaxInt32
58)
59
60var statusOK = status.New(codes.OK, "")
61
62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
63
64// MethodDesc represents an RPC service's method specification.
65type MethodDesc struct {
66	MethodName string
67	Handler    methodHandler
68}
69
70// ServiceDesc represents an RPC service's specification.
71type ServiceDesc struct {
72	ServiceName string
73	// The pointer to the service interface. Used to check whether the user
74	// provided implementation satisfies the interface requirements.
75	HandlerType interface{}
76	Methods     []MethodDesc
77	Streams     []StreamDesc
78	Metadata    interface{}
79}
80
81// service consists of the information of the server serving this service and
82// the methods in this service.
83type service struct {
84	server interface{} // the server for service methods
85	md     map[string]*MethodDesc
86	sd     map[string]*StreamDesc
87	mdata  interface{}
88}
89
90// Server is a gRPC server to serve RPC requests.
91type Server struct {
92	opts serverOptions
93
94	mu     sync.Mutex // guards following
95	lis    map[net.Listener]bool
96	conns  map[transport.ServerTransport]bool
97	serve  bool
98	drain  bool
99	cv     *sync.Cond          // signaled when connections close for GracefulStop
100	m      map[string]*service // service name -> service info
101	events trace.EventLog
102
103	quit               *grpcsync.Event
104	done               *grpcsync.Event
105	channelzRemoveOnce sync.Once
106	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
107
108	channelzID int64 // channelz unique identification number
109	czData     *channelzData
110}
111
112type serverOptions struct {
113	creds                 credentials.TransportCredentials
114	codec                 baseCodec
115	cp                    Compressor
116	dc                    Decompressor
117	unaryInt              UnaryServerInterceptor
118	streamInt             StreamServerInterceptor
119	chainUnaryInts        []UnaryServerInterceptor
120	chainStreamInts       []StreamServerInterceptor
121	inTapHandle           tap.ServerInHandle
122	statsHandler          stats.Handler
123	maxConcurrentStreams  uint32
124	maxReceiveMessageSize int
125	maxSendMessageSize    int
126	unknownStreamDesc     *StreamDesc
127	keepaliveParams       keepalive.ServerParameters
128	keepalivePolicy       keepalive.EnforcementPolicy
129	initialWindowSize     int32
130	initialConnWindowSize int32
131	writeBufferSize       int
132	readBufferSize        int
133	connectionTimeout     time.Duration
134	maxHeaderListSize     *uint32
135	headerTableSize       *uint32
136}
137
138var defaultServerOptions = serverOptions{
139	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
140	maxSendMessageSize:    defaultServerMaxSendMessageSize,
141	connectionTimeout:     120 * time.Second,
142	writeBufferSize:       defaultWriteBufSize,
143	readBufferSize:        defaultReadBufSize,
144}
145
146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
147type ServerOption interface {
148	apply(*serverOptions)
149}
150
151// EmptyServerOption does not alter the server configuration. It can be embedded
152// in another structure to build custom server options.
153//
154// This API is EXPERIMENTAL.
155type EmptyServerOption struct{}
156
157func (EmptyServerOption) apply(*serverOptions) {}
158
159// funcServerOption wraps a function that modifies serverOptions into an
160// implementation of the ServerOption interface.
161type funcServerOption struct {
162	f func(*serverOptions)
163}
164
165func (fdo *funcServerOption) apply(do *serverOptions) {
166	fdo.f(do)
167}
168
169func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
170	return &funcServerOption{
171		f: f,
172	}
173}
174
175// WriteBufferSize determines how much data can be batched before doing a write on the wire.
176// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
177// The default value for this buffer is 32KB.
178// Zero will disable the write buffer such that each write will be on underlying connection.
179// Note: A Send call may not directly translate to a write.
180func WriteBufferSize(s int) ServerOption {
181	return newFuncServerOption(func(o *serverOptions) {
182		o.writeBufferSize = s
183	})
184}
185
186// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
187// for one read syscall.
188// The default value for this buffer is 32KB.
189// Zero will disable read buffer for a connection so data framer can access the underlying
190// conn directly.
191func ReadBufferSize(s int) ServerOption {
192	return newFuncServerOption(func(o *serverOptions) {
193		o.readBufferSize = s
194	})
195}
196
197// InitialWindowSize returns a ServerOption that sets window size for stream.
198// The lower bound for window size is 64K and any value smaller than that will be ignored.
199func InitialWindowSize(s int32) ServerOption {
200	return newFuncServerOption(func(o *serverOptions) {
201		o.initialWindowSize = s
202	})
203}
204
205// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
206// The lower bound for window size is 64K and any value smaller than that will be ignored.
207func InitialConnWindowSize(s int32) ServerOption {
208	return newFuncServerOption(func(o *serverOptions) {
209		o.initialConnWindowSize = s
210	})
211}
212
213// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
214func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
215	if kp.Time > 0 && kp.Time < time.Second {
216		grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
217		kp.Time = time.Second
218	}
219
220	return newFuncServerOption(func(o *serverOptions) {
221		o.keepaliveParams = kp
222	})
223}
224
225// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
226func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
227	return newFuncServerOption(func(o *serverOptions) {
228		o.keepalivePolicy = kep
229	})
230}
231
232// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
233//
234// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
235func CustomCodec(codec Codec) ServerOption {
236	return newFuncServerOption(func(o *serverOptions) {
237		o.codec = codec
238	})
239}
240
241// RPCCompressor returns a ServerOption that sets a compressor for outbound
242// messages.  For backward compatibility, all outbound messages will be sent
243// using this compressor, regardless of incoming message compression.  By
244// default, server messages will be sent using the same compressor with which
245// request messages were sent.
246//
247// Deprecated: use encoding.RegisterCompressor instead.
248func RPCCompressor(cp Compressor) ServerOption {
249	return newFuncServerOption(func(o *serverOptions) {
250		o.cp = cp
251	})
252}
253
254// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
255// messages.  It has higher priority than decompressors registered via
256// encoding.RegisterCompressor.
257//
258// Deprecated: use encoding.RegisterCompressor instead.
259func RPCDecompressor(dc Decompressor) ServerOption {
260	return newFuncServerOption(func(o *serverOptions) {
261		o.dc = dc
262	})
263}
264
265// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
266// If this is not set, gRPC uses the default limit.
267//
268// Deprecated: use MaxRecvMsgSize instead.
269func MaxMsgSize(m int) ServerOption {
270	return MaxRecvMsgSize(m)
271}
272
273// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
274// If this is not set, gRPC uses the default 4MB.
275func MaxRecvMsgSize(m int) ServerOption {
276	return newFuncServerOption(func(o *serverOptions) {
277		o.maxReceiveMessageSize = m
278	})
279}
280
281// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
282// If this is not set, gRPC uses the default `math.MaxInt32`.
283func MaxSendMsgSize(m int) ServerOption {
284	return newFuncServerOption(func(o *serverOptions) {
285		o.maxSendMessageSize = m
286	})
287}
288
289// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
290// of concurrent streams to each ServerTransport.
291func MaxConcurrentStreams(n uint32) ServerOption {
292	return newFuncServerOption(func(o *serverOptions) {
293		o.maxConcurrentStreams = n
294	})
295}
296
297// Creds returns a ServerOption that sets credentials for server connections.
298func Creds(c credentials.TransportCredentials) ServerOption {
299	return newFuncServerOption(func(o *serverOptions) {
300		o.creds = c
301	})
302}
303
304// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
305// server. Only one unary interceptor can be installed. The construction of multiple
306// interceptors (e.g., chaining) can be implemented at the caller.
307func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
308	return newFuncServerOption(func(o *serverOptions) {
309		if o.unaryInt != nil {
310			panic("The unary server interceptor was already set and may not be reset.")
311		}
312		o.unaryInt = i
313	})
314}
315
316// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
317// for unary RPCs. The first interceptor will be the outer most,
318// while the last interceptor will be the inner most wrapper around the real call.
319// All unary interceptors added by this method will be chained.
320func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
321	return newFuncServerOption(func(o *serverOptions) {
322		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
323	})
324}
325
326// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
327// server. Only one stream interceptor can be installed.
328func StreamInterceptor(i StreamServerInterceptor) ServerOption {
329	return newFuncServerOption(func(o *serverOptions) {
330		if o.streamInt != nil {
331			panic("The stream server interceptor was already set and may not be reset.")
332		}
333		o.streamInt = i
334	})
335}
336
337// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
338// for stream RPCs. The first interceptor will be the outer most,
339// while the last interceptor will be the inner most wrapper around the real call.
340// All stream interceptors added by this method will be chained.
341func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
342	return newFuncServerOption(func(o *serverOptions) {
343		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
344	})
345}
346
347// InTapHandle returns a ServerOption that sets the tap handle for all the server
348// transport to be created. Only one can be installed.
349func InTapHandle(h tap.ServerInHandle) ServerOption {
350	return newFuncServerOption(func(o *serverOptions) {
351		if o.inTapHandle != nil {
352			panic("The tap handle was already set and may not be reset.")
353		}
354		o.inTapHandle = h
355	})
356}
357
358// StatsHandler returns a ServerOption that sets the stats handler for the server.
359func StatsHandler(h stats.Handler) ServerOption {
360	return newFuncServerOption(func(o *serverOptions) {
361		o.statsHandler = h
362	})
363}
364
365// UnknownServiceHandler returns a ServerOption that allows for adding a custom
366// unknown service handler. The provided method is a bidi-streaming RPC service
367// handler that will be invoked instead of returning the "unimplemented" gRPC
368// error whenever a request is received for an unregistered service or method.
369// The handling function and stream interceptor (if set) have full access to
370// the ServerStream, including its Context.
371func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
372	return newFuncServerOption(func(o *serverOptions) {
373		o.unknownStreamDesc = &StreamDesc{
374			StreamName: "unknown_service_handler",
375			Handler:    streamHandler,
376			// We need to assume that the users of the streamHandler will want to use both.
377			ClientStreams: true,
378			ServerStreams: true,
379		}
380	})
381}
382
383// ConnectionTimeout returns a ServerOption that sets the timeout for
384// connection establishment (up to and including HTTP/2 handshaking) for all
385// new connections.  If this is not set, the default is 120 seconds.  A zero or
386// negative value will result in an immediate timeout.
387//
388// This API is EXPERIMENTAL.
389func ConnectionTimeout(d time.Duration) ServerOption {
390	return newFuncServerOption(func(o *serverOptions) {
391		o.connectionTimeout = d
392	})
393}
394
395// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
396// of header list that the server is prepared to accept.
397func MaxHeaderListSize(s uint32) ServerOption {
398	return newFuncServerOption(func(o *serverOptions) {
399		o.maxHeaderListSize = &s
400	})
401}
402
403// HeaderTableSize returns a ServerOption that sets the size of dynamic
404// header table for stream.
405//
406// This API is EXPERIMENTAL.
407func HeaderTableSize(s uint32) ServerOption {
408	return newFuncServerOption(func(o *serverOptions) {
409		o.headerTableSize = &s
410	})
411}
412
413// NewServer creates a gRPC server which has no service registered and has not
414// started to accept requests yet.
415func NewServer(opt ...ServerOption) *Server {
416	opts := defaultServerOptions
417	for _, o := range opt {
418		o.apply(&opts)
419	}
420	s := &Server{
421		lis:    make(map[net.Listener]bool),
422		opts:   opts,
423		conns:  make(map[transport.ServerTransport]bool),
424		m:      make(map[string]*service),
425		quit:   grpcsync.NewEvent(),
426		done:   grpcsync.NewEvent(),
427		czData: new(channelzData),
428	}
429	chainUnaryServerInterceptors(s)
430	chainStreamServerInterceptors(s)
431	s.cv = sync.NewCond(&s.mu)
432	if EnableTracing {
433		_, file, line, _ := runtime.Caller(1)
434		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
435	}
436
437	if channelz.IsOn() {
438		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
439	}
440	return s
441}
442
443// printf records an event in s's event log, unless s has been stopped.
444// REQUIRES s.mu is held.
445func (s *Server) printf(format string, a ...interface{}) {
446	if s.events != nil {
447		s.events.Printf(format, a...)
448	}
449}
450
451// errorf records an error in s's event log, unless s has been stopped.
452// REQUIRES s.mu is held.
453func (s *Server) errorf(format string, a ...interface{}) {
454	if s.events != nil {
455		s.events.Errorf(format, a...)
456	}
457}
458
459// RegisterService registers a service and its implementation to the gRPC
460// server. It is called from the IDL generated code. This must be called before
461// invoking Serve.
462func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
463	ht := reflect.TypeOf(sd.HandlerType).Elem()
464	st := reflect.TypeOf(ss)
465	if !st.Implements(ht) {
466		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
467	}
468	s.register(sd, ss)
469}
470
471func (s *Server) register(sd *ServiceDesc, ss interface{}) {
472	s.mu.Lock()
473	defer s.mu.Unlock()
474	s.printf("RegisterService(%q)", sd.ServiceName)
475	if s.serve {
476		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
477	}
478	if _, ok := s.m[sd.ServiceName]; ok {
479		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
480	}
481	srv := &service{
482		server: ss,
483		md:     make(map[string]*MethodDesc),
484		sd:     make(map[string]*StreamDesc),
485		mdata:  sd.Metadata,
486	}
487	for i := range sd.Methods {
488		d := &sd.Methods[i]
489		srv.md[d.MethodName] = d
490	}
491	for i := range sd.Streams {
492		d := &sd.Streams[i]
493		srv.sd[d.StreamName] = d
494	}
495	s.m[sd.ServiceName] = srv
496}
497
498// MethodInfo contains the information of an RPC including its method name and type.
499type MethodInfo struct {
500	// Name is the method name only, without the service name or package name.
501	Name string
502	// IsClientStream indicates whether the RPC is a client streaming RPC.
503	IsClientStream bool
504	// IsServerStream indicates whether the RPC is a server streaming RPC.
505	IsServerStream bool
506}
507
508// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
509type ServiceInfo struct {
510	Methods []MethodInfo
511	// Metadata is the metadata specified in ServiceDesc when registering service.
512	Metadata interface{}
513}
514
515// GetServiceInfo returns a map from service names to ServiceInfo.
516// Service names include the package names, in the form of <package>.<service>.
517func (s *Server) GetServiceInfo() map[string]ServiceInfo {
518	ret := make(map[string]ServiceInfo)
519	for n, srv := range s.m {
520		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
521		for m := range srv.md {
522			methods = append(methods, MethodInfo{
523				Name:           m,
524				IsClientStream: false,
525				IsServerStream: false,
526			})
527		}
528		for m, d := range srv.sd {
529			methods = append(methods, MethodInfo{
530				Name:           m,
531				IsClientStream: d.ClientStreams,
532				IsServerStream: d.ServerStreams,
533			})
534		}
535
536		ret[n] = ServiceInfo{
537			Methods:  methods,
538			Metadata: srv.mdata,
539		}
540	}
541	return ret
542}
543
544// ErrServerStopped indicates that the operation is now illegal because of
545// the server being stopped.
546var ErrServerStopped = errors.New("grpc: the server has been stopped")
547
548func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
549	if s.opts.creds == nil {
550		return rawConn, nil, nil
551	}
552	return s.opts.creds.ServerHandshake(rawConn)
553}
554
555type listenSocket struct {
556	net.Listener
557	channelzID int64
558}
559
560func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
561	return &channelz.SocketInternalMetric{
562		SocketOptions: channelz.GetSocketOption(l.Listener),
563		LocalAddr:     l.Listener.Addr(),
564	}
565}
566
567func (l *listenSocket) Close() error {
568	err := l.Listener.Close()
569	if channelz.IsOn() {
570		channelz.RemoveEntry(l.channelzID)
571	}
572	return err
573}
574
575// Serve accepts incoming connections on the listener lis, creating a new
576// ServerTransport and service goroutine for each. The service goroutines
577// read gRPC requests and then call the registered handlers to reply to them.
578// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
579// this method returns.
580// Serve will return a non-nil error unless Stop or GracefulStop is called.
581func (s *Server) Serve(lis net.Listener) error {
582	s.mu.Lock()
583	s.printf("serving")
584	s.serve = true
585	if s.lis == nil {
586		// Serve called after Stop or GracefulStop.
587		s.mu.Unlock()
588		lis.Close()
589		return ErrServerStopped
590	}
591
592	s.serveWG.Add(1)
593	defer func() {
594		s.serveWG.Done()
595		if s.quit.HasFired() {
596			// Stop or GracefulStop called; block until done and return nil.
597			<-s.done.Done()
598		}
599	}()
600
601	ls := &listenSocket{Listener: lis}
602	s.lis[ls] = true
603
604	if channelz.IsOn() {
605		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
606	}
607	s.mu.Unlock()
608
609	defer func() {
610		s.mu.Lock()
611		if s.lis != nil && s.lis[ls] {
612			ls.Close()
613			delete(s.lis, ls)
614		}
615		s.mu.Unlock()
616	}()
617
618	var tempDelay time.Duration // how long to sleep on accept failure
619
620	for {
621		rawConn, err := lis.Accept()
622		if err != nil {
623			if ne, ok := err.(interface {
624				Temporary() bool
625			}); ok && ne.Temporary() {
626				if tempDelay == 0 {
627					tempDelay = 5 * time.Millisecond
628				} else {
629					tempDelay *= 2
630				}
631				if max := 1 * time.Second; tempDelay > max {
632					tempDelay = max
633				}
634				s.mu.Lock()
635				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
636				s.mu.Unlock()
637				timer := time.NewTimer(tempDelay)
638				select {
639				case <-timer.C:
640				case <-s.quit.Done():
641					timer.Stop()
642					return nil
643				}
644				continue
645			}
646			s.mu.Lock()
647			s.printf("done serving; Accept = %v", err)
648			s.mu.Unlock()
649
650			if s.quit.HasFired() {
651				return nil
652			}
653			return err
654		}
655		tempDelay = 0
656		// Start a new goroutine to deal with rawConn so we don't stall this Accept
657		// loop goroutine.
658		//
659		// Make sure we account for the goroutine so GracefulStop doesn't nil out
660		// s.conns before this conn can be added.
661		s.serveWG.Add(1)
662		go func() {
663			s.handleRawConn(rawConn)
664			s.serveWG.Done()
665		}()
666	}
667}
668
669// handleRawConn forks a goroutine to handle a just-accepted connection that
670// has not had any I/O performed on it yet.
671func (s *Server) handleRawConn(rawConn net.Conn) {
672	if s.quit.HasFired() {
673		rawConn.Close()
674		return
675	}
676	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
677	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
678	if err != nil {
679		// ErrConnDispatched means that the connection was dispatched away from
680		// gRPC; those connections should be left open.
681		if err != credentials.ErrConnDispatched {
682			s.mu.Lock()
683			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
684			s.mu.Unlock()
685			channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
686			rawConn.Close()
687		}
688		rawConn.SetDeadline(time.Time{})
689		return
690	}
691
692	// Finish handshaking (HTTP2)
693	st := s.newHTTP2Transport(conn, authInfo)
694	if st == nil {
695		return
696	}
697
698	rawConn.SetDeadline(time.Time{})
699	if !s.addConn(st) {
700		return
701	}
702	go func() {
703		s.serveStreams(st)
704		s.removeConn(st)
705	}()
706}
707
708// newHTTP2Transport sets up a http/2 transport (using the
709// gRPC http2 server transport in transport/http2_server.go).
710func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
711	config := &transport.ServerConfig{
712		MaxStreams:            s.opts.maxConcurrentStreams,
713		AuthInfo:              authInfo,
714		InTapHandle:           s.opts.inTapHandle,
715		StatsHandler:          s.opts.statsHandler,
716		KeepaliveParams:       s.opts.keepaliveParams,
717		KeepalivePolicy:       s.opts.keepalivePolicy,
718		InitialWindowSize:     s.opts.initialWindowSize,
719		InitialConnWindowSize: s.opts.initialConnWindowSize,
720		WriteBufferSize:       s.opts.writeBufferSize,
721		ReadBufferSize:        s.opts.readBufferSize,
722		ChannelzParentID:      s.channelzID,
723		MaxHeaderListSize:     s.opts.maxHeaderListSize,
724		HeaderTableSize:       s.opts.headerTableSize,
725	}
726	st, err := transport.NewServerTransport("http2", c, config)
727	if err != nil {
728		s.mu.Lock()
729		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
730		s.mu.Unlock()
731		c.Close()
732		channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
733		return nil
734	}
735
736	return st
737}
738
739func (s *Server) serveStreams(st transport.ServerTransport) {
740	defer st.Close()
741	var wg sync.WaitGroup
742	st.HandleStreams(func(stream *transport.Stream) {
743		wg.Add(1)
744		go func() {
745			defer wg.Done()
746			s.handleStream(st, stream, s.traceInfo(st, stream))
747		}()
748	}, func(ctx context.Context, method string) context.Context {
749		if !EnableTracing {
750			return ctx
751		}
752		tr := trace.New("grpc.Recv."+methodFamily(method), method)
753		return trace.NewContext(ctx, tr)
754	})
755	wg.Wait()
756}
757
758var _ http.Handler = (*Server)(nil)
759
760// ServeHTTP implements the Go standard library's http.Handler
761// interface by responding to the gRPC request r, by looking up
762// the requested gRPC method in the gRPC server s.
763//
764// The provided HTTP request must have arrived on an HTTP/2
765// connection. When using the Go standard library's server,
766// practically this means that the Request must also have arrived
767// over TLS.
768//
769// To share one port (such as 443 for https) between gRPC and an
770// existing http.Handler, use a root http.Handler such as:
771//
772//   if r.ProtoMajor == 2 && strings.HasPrefix(
773//   	r.Header.Get("Content-Type"), "application/grpc") {
774//   	grpcServer.ServeHTTP(w, r)
775//   } else {
776//   	yourMux.ServeHTTP(w, r)
777//   }
778//
779// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
780// separate from grpc-go's HTTP/2 server. Performance and features may vary
781// between the two paths. ServeHTTP does not support some gRPC features
782// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
783// and subject to change.
784func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
785	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
786	if err != nil {
787		http.Error(w, err.Error(), http.StatusInternalServerError)
788		return
789	}
790	if !s.addConn(st) {
791		return
792	}
793	defer s.removeConn(st)
794	s.serveStreams(st)
795}
796
797// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
798// If tracing is not enabled, it returns nil.
799func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
800	if !EnableTracing {
801		return nil
802	}
803	tr, ok := trace.FromContext(stream.Context())
804	if !ok {
805		return nil
806	}
807
808	trInfo = &traceInfo{
809		tr: tr,
810		firstLine: firstLine{
811			client:     false,
812			remoteAddr: st.RemoteAddr(),
813		},
814	}
815	if dl, ok := stream.Context().Deadline(); ok {
816		trInfo.firstLine.deadline = time.Until(dl)
817	}
818	return trInfo
819}
820
821func (s *Server) addConn(st transport.ServerTransport) bool {
822	s.mu.Lock()
823	defer s.mu.Unlock()
824	if s.conns == nil {
825		st.Close()
826		return false
827	}
828	if s.drain {
829		// Transport added after we drained our existing conns: drain it
830		// immediately.
831		st.Drain()
832	}
833	s.conns[st] = true
834	return true
835}
836
837func (s *Server) removeConn(st transport.ServerTransport) {
838	s.mu.Lock()
839	defer s.mu.Unlock()
840	if s.conns != nil {
841		delete(s.conns, st)
842		s.cv.Broadcast()
843	}
844}
845
846func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
847	return &channelz.ServerInternalMetric{
848		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
849		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
850		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
851		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
852	}
853}
854
855func (s *Server) incrCallsStarted() {
856	atomic.AddInt64(&s.czData.callsStarted, 1)
857	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
858}
859
860func (s *Server) incrCallsSucceeded() {
861	atomic.AddInt64(&s.czData.callsSucceeded, 1)
862}
863
864func (s *Server) incrCallsFailed() {
865	atomic.AddInt64(&s.czData.callsFailed, 1)
866}
867
868func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
869	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
870	if err != nil {
871		channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
872		return err
873	}
874	compData, err := compress(data, cp, comp)
875	if err != nil {
876		channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
877		return err
878	}
879	hdr, payload := msgHeader(data, compData)
880	// TODO(dfawley): should we be checking len(data) instead?
881	if len(payload) > s.opts.maxSendMessageSize {
882		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
883	}
884	err = t.Write(stream, hdr, payload, opts)
885	if err == nil && s.opts.statsHandler != nil {
886		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
887	}
888	return err
889}
890
891// chainUnaryServerInterceptors chains all unary server interceptors into one.
892func chainUnaryServerInterceptors(s *Server) {
893	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
894	// be executed before any other chained interceptors.
895	interceptors := s.opts.chainUnaryInts
896	if s.opts.unaryInt != nil {
897		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
898	}
899
900	var chainedInt UnaryServerInterceptor
901	if len(interceptors) == 0 {
902		chainedInt = nil
903	} else if len(interceptors) == 1 {
904		chainedInt = interceptors[0]
905	} else {
906		chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
907			return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
908		}
909	}
910
911	s.opts.unaryInt = chainedInt
912}
913
914// getChainUnaryHandler recursively generate the chained UnaryHandler
915func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
916	if curr == len(interceptors)-1 {
917		return finalHandler
918	}
919
920	return func(ctx context.Context, req interface{}) (interface{}, error) {
921		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
922	}
923}
924
925func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
926	sh := s.opts.statsHandler
927	if sh != nil || trInfo != nil || channelz.IsOn() {
928		if channelz.IsOn() {
929			s.incrCallsStarted()
930		}
931		var statsBegin *stats.Begin
932		if sh != nil {
933			beginTime := time.Now()
934			statsBegin = &stats.Begin{
935				BeginTime: beginTime,
936			}
937			sh.HandleRPC(stream.Context(), statsBegin)
938		}
939		if trInfo != nil {
940			trInfo.tr.LazyLog(&trInfo.firstLine, false)
941		}
942		// The deferred error handling for tracing, stats handler and channelz are
943		// combined into one function to reduce stack usage -- a defer takes ~56-64
944		// bytes on the stack, so overflowing the stack will require a stack
945		// re-allocation, which is expensive.
946		//
947		// To maintain behavior similar to separate deferred statements, statements
948		// should be executed in the reverse order. That is, tracing first, stats
949		// handler second, and channelz last. Note that panics *within* defers will
950		// lead to different behavior, but that's an acceptable compromise; that
951		// would be undefined behavior territory anyway.
952		defer func() {
953			if trInfo != nil {
954				if err != nil && err != io.EOF {
955					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
956					trInfo.tr.SetError()
957				}
958				trInfo.tr.Finish()
959			}
960
961			if sh != nil {
962				end := &stats.End{
963					BeginTime: statsBegin.BeginTime,
964					EndTime:   time.Now(),
965				}
966				if err != nil && err != io.EOF {
967					end.Error = toRPCErr(err)
968				}
969				sh.HandleRPC(stream.Context(), end)
970			}
971
972			if channelz.IsOn() {
973				if err != nil && err != io.EOF {
974					s.incrCallsFailed()
975				} else {
976					s.incrCallsSucceeded()
977				}
978			}
979		}()
980	}
981
982	binlog := binarylog.GetMethodLogger(stream.Method())
983	if binlog != nil {
984		ctx := stream.Context()
985		md, _ := metadata.FromIncomingContext(ctx)
986		logEntry := &binarylog.ClientHeader{
987			Header:     md,
988			MethodName: stream.Method(),
989			PeerAddr:   nil,
990		}
991		if deadline, ok := ctx.Deadline(); ok {
992			logEntry.Timeout = time.Until(deadline)
993			if logEntry.Timeout < 0 {
994				logEntry.Timeout = 0
995			}
996		}
997		if a := md[":authority"]; len(a) > 0 {
998			logEntry.Authority = a[0]
999		}
1000		if peer, ok := peer.FromContext(ctx); ok {
1001			logEntry.PeerAddr = peer.Addr
1002		}
1003		binlog.Log(logEntry)
1004	}
1005
1006	// comp and cp are used for compression.  decomp and dc are used for
1007	// decompression.  If comp and decomp are both set, they are the same;
1008	// however they are kept separate to ensure that at most one of the
1009	// compressor/decompressor variable pairs are set for use later.
1010	var comp, decomp encoding.Compressor
1011	var cp Compressor
1012	var dc Decompressor
1013
1014	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1015	// to find a matching registered compressor for decomp.
1016	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1017		dc = s.opts.dc
1018	} else if rc != "" && rc != encoding.Identity {
1019		decomp = encoding.GetCompressor(rc)
1020		if decomp == nil {
1021			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1022			t.WriteStatus(stream, st)
1023			return st.Err()
1024		}
1025	}
1026
1027	// If cp is set, use it.  Otherwise, attempt to compress the response using
1028	// the incoming message compression method.
1029	//
1030	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1031	if s.opts.cp != nil {
1032		cp = s.opts.cp
1033		stream.SetSendCompress(cp.Type())
1034	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1035		// Legacy compressor not specified; attempt to respond with same encoding.
1036		comp = encoding.GetCompressor(rc)
1037		if comp != nil {
1038			stream.SetSendCompress(rc)
1039		}
1040	}
1041
1042	var payInfo *payloadInfo
1043	if sh != nil || binlog != nil {
1044		payInfo = &payloadInfo{}
1045	}
1046	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1047	if err != nil {
1048		if st, ok := status.FromError(err); ok {
1049			if e := t.WriteStatus(stream, st); e != nil {
1050				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1051			}
1052		}
1053		return err
1054	}
1055	if channelz.IsOn() {
1056		t.IncrMsgRecv()
1057	}
1058	df := func(v interface{}) error {
1059		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1060			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1061		}
1062		if sh != nil {
1063			sh.HandleRPC(stream.Context(), &stats.InPayload{
1064				RecvTime:   time.Now(),
1065				Payload:    v,
1066				WireLength: payInfo.wireLength,
1067				Data:       d,
1068				Length:     len(d),
1069			})
1070		}
1071		if binlog != nil {
1072			binlog.Log(&binarylog.ClientMessage{
1073				Message: d,
1074			})
1075		}
1076		if trInfo != nil {
1077			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1078		}
1079		return nil
1080	}
1081	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1082	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1083	if appErr != nil {
1084		appStatus, ok := status.FromError(appErr)
1085		if !ok {
1086			// Convert appErr if it is not a grpc status error.
1087			appErr = status.Error(codes.Unknown, appErr.Error())
1088			appStatus, _ = status.FromError(appErr)
1089		}
1090		if trInfo != nil {
1091			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1092			trInfo.tr.SetError()
1093		}
1094		if e := t.WriteStatus(stream, appStatus); e != nil {
1095			channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1096		}
1097		if binlog != nil {
1098			if h, _ := stream.Header(); h.Len() > 0 {
1099				// Only log serverHeader if there was header. Otherwise it can
1100				// be trailer only.
1101				binlog.Log(&binarylog.ServerHeader{
1102					Header: h,
1103				})
1104			}
1105			binlog.Log(&binarylog.ServerTrailer{
1106				Trailer: stream.Trailer(),
1107				Err:     appErr,
1108			})
1109		}
1110		return appErr
1111	}
1112	if trInfo != nil {
1113		trInfo.tr.LazyLog(stringer("OK"), false)
1114	}
1115	opts := &transport.Options{Last: true}
1116
1117	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1118		if err == io.EOF {
1119			// The entire stream is done (for unary RPC only).
1120			return err
1121		}
1122		if sts, ok := status.FromError(err); ok {
1123			if e := t.WriteStatus(stream, sts); e != nil {
1124				channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1125			}
1126		} else {
1127			switch st := err.(type) {
1128			case transport.ConnectionError:
1129				// Nothing to do here.
1130			default:
1131				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1132			}
1133		}
1134		if binlog != nil {
1135			h, _ := stream.Header()
1136			binlog.Log(&binarylog.ServerHeader{
1137				Header: h,
1138			})
1139			binlog.Log(&binarylog.ServerTrailer{
1140				Trailer: stream.Trailer(),
1141				Err:     appErr,
1142			})
1143		}
1144		return err
1145	}
1146	if binlog != nil {
1147		h, _ := stream.Header()
1148		binlog.Log(&binarylog.ServerHeader{
1149			Header: h,
1150		})
1151		binlog.Log(&binarylog.ServerMessage{
1152			Message: reply,
1153		})
1154	}
1155	if channelz.IsOn() {
1156		t.IncrMsgSent()
1157	}
1158	if trInfo != nil {
1159		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1160	}
1161	// TODO: Should we be logging if writing status failed here, like above?
1162	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1163	// error or allow the stats handler to see it?
1164	err = t.WriteStatus(stream, statusOK)
1165	if binlog != nil {
1166		binlog.Log(&binarylog.ServerTrailer{
1167			Trailer: stream.Trailer(),
1168			Err:     appErr,
1169		})
1170	}
1171	return err
1172}
1173
1174// chainStreamServerInterceptors chains all stream server interceptors into one.
1175func chainStreamServerInterceptors(s *Server) {
1176	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1177	// be executed before any other chained interceptors.
1178	interceptors := s.opts.chainStreamInts
1179	if s.opts.streamInt != nil {
1180		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1181	}
1182
1183	var chainedInt StreamServerInterceptor
1184	if len(interceptors) == 0 {
1185		chainedInt = nil
1186	} else if len(interceptors) == 1 {
1187		chainedInt = interceptors[0]
1188	} else {
1189		chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1190			return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1191		}
1192	}
1193
1194	s.opts.streamInt = chainedInt
1195}
1196
1197// getChainStreamHandler recursively generate the chained StreamHandler
1198func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1199	if curr == len(interceptors)-1 {
1200		return finalHandler
1201	}
1202
1203	return func(srv interface{}, ss ServerStream) error {
1204		return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1205	}
1206}
1207
1208func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1209	if channelz.IsOn() {
1210		s.incrCallsStarted()
1211	}
1212	sh := s.opts.statsHandler
1213	var statsBegin *stats.Begin
1214	if sh != nil {
1215		beginTime := time.Now()
1216		statsBegin = &stats.Begin{
1217			BeginTime: beginTime,
1218		}
1219		sh.HandleRPC(stream.Context(), statsBegin)
1220	}
1221	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1222	ss := &serverStream{
1223		ctx:                   ctx,
1224		t:                     t,
1225		s:                     stream,
1226		p:                     &parser{r: stream},
1227		codec:                 s.getCodec(stream.ContentSubtype()),
1228		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1229		maxSendMessageSize:    s.opts.maxSendMessageSize,
1230		trInfo:                trInfo,
1231		statsHandler:          sh,
1232	}
1233
1234	if sh != nil || trInfo != nil || channelz.IsOn() {
1235		// See comment in processUnaryRPC on defers.
1236		defer func() {
1237			if trInfo != nil {
1238				ss.mu.Lock()
1239				if err != nil && err != io.EOF {
1240					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1241					ss.trInfo.tr.SetError()
1242				}
1243				ss.trInfo.tr.Finish()
1244				ss.trInfo.tr = nil
1245				ss.mu.Unlock()
1246			}
1247
1248			if sh != nil {
1249				end := &stats.End{
1250					BeginTime: statsBegin.BeginTime,
1251					EndTime:   time.Now(),
1252				}
1253				if err != nil && err != io.EOF {
1254					end.Error = toRPCErr(err)
1255				}
1256				sh.HandleRPC(stream.Context(), end)
1257			}
1258
1259			if channelz.IsOn() {
1260				if err != nil && err != io.EOF {
1261					s.incrCallsFailed()
1262				} else {
1263					s.incrCallsSucceeded()
1264				}
1265			}
1266		}()
1267	}
1268
1269	ss.binlog = binarylog.GetMethodLogger(stream.Method())
1270	if ss.binlog != nil {
1271		md, _ := metadata.FromIncomingContext(ctx)
1272		logEntry := &binarylog.ClientHeader{
1273			Header:     md,
1274			MethodName: stream.Method(),
1275			PeerAddr:   nil,
1276		}
1277		if deadline, ok := ctx.Deadline(); ok {
1278			logEntry.Timeout = time.Until(deadline)
1279			if logEntry.Timeout < 0 {
1280				logEntry.Timeout = 0
1281			}
1282		}
1283		if a := md[":authority"]; len(a) > 0 {
1284			logEntry.Authority = a[0]
1285		}
1286		if peer, ok := peer.FromContext(ss.Context()); ok {
1287			logEntry.PeerAddr = peer.Addr
1288		}
1289		ss.binlog.Log(logEntry)
1290	}
1291
1292	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1293	// to find a matching registered compressor for decomp.
1294	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1295		ss.dc = s.opts.dc
1296	} else if rc != "" && rc != encoding.Identity {
1297		ss.decomp = encoding.GetCompressor(rc)
1298		if ss.decomp == nil {
1299			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1300			t.WriteStatus(ss.s, st)
1301			return st.Err()
1302		}
1303	}
1304
1305	// If cp is set, use it.  Otherwise, attempt to compress the response using
1306	// the incoming message compression method.
1307	//
1308	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1309	if s.opts.cp != nil {
1310		ss.cp = s.opts.cp
1311		stream.SetSendCompress(s.opts.cp.Type())
1312	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1313		// Legacy compressor not specified; attempt to respond with same encoding.
1314		ss.comp = encoding.GetCompressor(rc)
1315		if ss.comp != nil {
1316			stream.SetSendCompress(rc)
1317		}
1318	}
1319
1320	if trInfo != nil {
1321		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1322	}
1323	var appErr error
1324	var server interface{}
1325	if srv != nil {
1326		server = srv.server
1327	}
1328	if s.opts.streamInt == nil {
1329		appErr = sd.Handler(server, ss)
1330	} else {
1331		info := &StreamServerInfo{
1332			FullMethod:     stream.Method(),
1333			IsClientStream: sd.ClientStreams,
1334			IsServerStream: sd.ServerStreams,
1335		}
1336		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1337	}
1338	if appErr != nil {
1339		appStatus, ok := status.FromError(appErr)
1340		if !ok {
1341			appStatus = status.New(codes.Unknown, appErr.Error())
1342			appErr = appStatus.Err()
1343		}
1344		if trInfo != nil {
1345			ss.mu.Lock()
1346			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1347			ss.trInfo.tr.SetError()
1348			ss.mu.Unlock()
1349		}
1350		t.WriteStatus(ss.s, appStatus)
1351		if ss.binlog != nil {
1352			ss.binlog.Log(&binarylog.ServerTrailer{
1353				Trailer: ss.s.Trailer(),
1354				Err:     appErr,
1355			})
1356		}
1357		// TODO: Should we log an error from WriteStatus here and below?
1358		return appErr
1359	}
1360	if trInfo != nil {
1361		ss.mu.Lock()
1362		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1363		ss.mu.Unlock()
1364	}
1365	err = t.WriteStatus(ss.s, statusOK)
1366	if ss.binlog != nil {
1367		ss.binlog.Log(&binarylog.ServerTrailer{
1368			Trailer: ss.s.Trailer(),
1369			Err:     appErr,
1370		})
1371	}
1372	return err
1373}
1374
1375func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1376	sm := stream.Method()
1377	if sm != "" && sm[0] == '/' {
1378		sm = sm[1:]
1379	}
1380	pos := strings.LastIndex(sm, "/")
1381	if pos == -1 {
1382		if trInfo != nil {
1383			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1384			trInfo.tr.SetError()
1385		}
1386		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1387		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1388			if trInfo != nil {
1389				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1390				trInfo.tr.SetError()
1391			}
1392			channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1393		}
1394		if trInfo != nil {
1395			trInfo.tr.Finish()
1396		}
1397		return
1398	}
1399	service := sm[:pos]
1400	method := sm[pos+1:]
1401
1402	srv, knownService := s.m[service]
1403	if knownService {
1404		if md, ok := srv.md[method]; ok {
1405			s.processUnaryRPC(t, stream, srv, md, trInfo)
1406			return
1407		}
1408		if sd, ok := srv.sd[method]; ok {
1409			s.processStreamingRPC(t, stream, srv, sd, trInfo)
1410			return
1411		}
1412	}
1413	// Unknown service, or known server unknown method.
1414	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1415		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1416		return
1417	}
1418	var errDesc string
1419	if !knownService {
1420		errDesc = fmt.Sprintf("unknown service %v", service)
1421	} else {
1422		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1423	}
1424	if trInfo != nil {
1425		trInfo.tr.LazyPrintf("%s", errDesc)
1426		trInfo.tr.SetError()
1427	}
1428	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1429		if trInfo != nil {
1430			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1431			trInfo.tr.SetError()
1432		}
1433		channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1434	}
1435	if trInfo != nil {
1436		trInfo.tr.Finish()
1437	}
1438}
1439
1440// The key to save ServerTransportStream in the context.
1441type streamKey struct{}
1442
1443// NewContextWithServerTransportStream creates a new context from ctx and
1444// attaches stream to it.
1445//
1446// This API is EXPERIMENTAL.
1447func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1448	return context.WithValue(ctx, streamKey{}, stream)
1449}
1450
1451// ServerTransportStream is a minimal interface that a transport stream must
1452// implement. This can be used to mock an actual transport stream for tests of
1453// handler code that use, for example, grpc.SetHeader (which requires some
1454// stream to be in context).
1455//
1456// See also NewContextWithServerTransportStream.
1457//
1458// This API is EXPERIMENTAL.
1459type ServerTransportStream interface {
1460	Method() string
1461	SetHeader(md metadata.MD) error
1462	SendHeader(md metadata.MD) error
1463	SetTrailer(md metadata.MD) error
1464}
1465
1466// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1467// ctx. Returns nil if the given context has no stream associated with it
1468// (which implies it is not an RPC invocation context).
1469//
1470// This API is EXPERIMENTAL.
1471func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1472	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1473	return s
1474}
1475
1476// Stop stops the gRPC server. It immediately closes all open
1477// connections and listeners.
1478// It cancels all active RPCs on the server side and the corresponding
1479// pending RPCs on the client side will get notified by connection
1480// errors.
1481func (s *Server) Stop() {
1482	s.quit.Fire()
1483
1484	defer func() {
1485		s.serveWG.Wait()
1486		s.done.Fire()
1487	}()
1488
1489	s.channelzRemoveOnce.Do(func() {
1490		if channelz.IsOn() {
1491			channelz.RemoveEntry(s.channelzID)
1492		}
1493	})
1494
1495	s.mu.Lock()
1496	listeners := s.lis
1497	s.lis = nil
1498	st := s.conns
1499	s.conns = nil
1500	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1501	s.cv.Broadcast()
1502	s.mu.Unlock()
1503
1504	for lis := range listeners {
1505		lis.Close()
1506	}
1507	for c := range st {
1508		c.Close()
1509	}
1510
1511	s.mu.Lock()
1512	if s.events != nil {
1513		s.events.Finish()
1514		s.events = nil
1515	}
1516	s.mu.Unlock()
1517}
1518
1519// GracefulStop stops the gRPC server gracefully. It stops the server from
1520// accepting new connections and RPCs and blocks until all the pending RPCs are
1521// finished.
1522func (s *Server) GracefulStop() {
1523	s.quit.Fire()
1524	defer s.done.Fire()
1525
1526	s.channelzRemoveOnce.Do(func() {
1527		if channelz.IsOn() {
1528			channelz.RemoveEntry(s.channelzID)
1529		}
1530	})
1531	s.mu.Lock()
1532	if s.conns == nil {
1533		s.mu.Unlock()
1534		return
1535	}
1536
1537	for lis := range s.lis {
1538		lis.Close()
1539	}
1540	s.lis = nil
1541	if !s.drain {
1542		for st := range s.conns {
1543			st.Drain()
1544		}
1545		s.drain = true
1546	}
1547
1548	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1549	// new conns will be created.
1550	s.mu.Unlock()
1551	s.serveWG.Wait()
1552	s.mu.Lock()
1553
1554	for len(s.conns) != 0 {
1555		s.cv.Wait()
1556	}
1557	s.conns = nil
1558	if s.events != nil {
1559		s.events.Finish()
1560		s.events = nil
1561	}
1562	s.mu.Unlock()
1563}
1564
1565// contentSubtype must be lowercase
1566// cannot return nil
1567func (s *Server) getCodec(contentSubtype string) baseCodec {
1568	if s.opts.codec != nil {
1569		return s.opts.codec
1570	}
1571	if contentSubtype == "" {
1572		return encoding.GetCodec(proto.Name)
1573	}
1574	codec := encoding.GetCodec(contentSubtype)
1575	if codec == nil {
1576		return encoding.GetCodec(proto.Name)
1577	}
1578	return codec
1579}
1580
1581// SetHeader sets the header metadata.
1582// When called multiple times, all the provided metadata will be merged.
1583// All the metadata will be sent out when one of the following happens:
1584//  - grpc.SendHeader() is called;
1585//  - The first response is sent out;
1586//  - An RPC status is sent out (error or success).
1587func SetHeader(ctx context.Context, md metadata.MD) error {
1588	if md.Len() == 0 {
1589		return nil
1590	}
1591	stream := ServerTransportStreamFromContext(ctx)
1592	if stream == nil {
1593		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1594	}
1595	return stream.SetHeader(md)
1596}
1597
1598// SendHeader sends header metadata. It may be called at most once.
1599// The provided md and headers set by SetHeader() will be sent.
1600func SendHeader(ctx context.Context, md metadata.MD) error {
1601	stream := ServerTransportStreamFromContext(ctx)
1602	if stream == nil {
1603		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1604	}
1605	if err := stream.SendHeader(md); err != nil {
1606		return toRPCErr(err)
1607	}
1608	return nil
1609}
1610
1611// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1612// When called more than once, all the provided metadata will be merged.
1613func SetTrailer(ctx context.Context, md metadata.MD) error {
1614	if md.Len() == 0 {
1615		return nil
1616	}
1617	stream := ServerTransportStreamFromContext(ctx)
1618	if stream == nil {
1619		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1620	}
1621	return stream.SetTrailer(md)
1622}
1623
1624// Method returns the method string for the server context.  The returned
1625// string is in the format of "/service/method".
1626func Method(ctx context.Context) (string, bool) {
1627	s := ServerTransportStreamFromContext(ctx)
1628	if s == nil {
1629		return "", false
1630	}
1631	return s.Method(), true
1632}
1633
1634type channelzServer struct {
1635	s *Server
1636}
1637
1638func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1639	return c.s.channelzMetric()
1640}
1641