1package grpc_prometheus
2
3import (
4	prom "github.com/prometheus/client_golang/prometheus"
5	"golang.org/x/net/context"
6	"google.golang.org/grpc"
7	"google.golang.org/grpc/status"
8)
9
10// ServerMetrics represents a collection of metrics to be registered on a
11// Prometheus metrics registry for a gRPC server.
12type ServerMetrics struct {
13	serverStartedCounter          *prom.CounterVec
14	serverHandledCounter          *prom.CounterVec
15	serverStreamMsgReceived       *prom.CounterVec
16	serverStreamMsgSent           *prom.CounterVec
17	serverHandledHistogramEnabled bool
18	serverHandledHistogramOpts    prom.HistogramOpts
19	serverHandledHistogram        *prom.HistogramVec
20}
21
22// NewServerMetrics returns a ServerMetrics object. Use a new instance of
23// ServerMetrics when not using the default Prometheus metrics registry, for
24// example when wanting to control which metrics are added to a registry as
25// opposed to automatically adding metrics via init functions.
26func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
27	opts := counterOptions(counterOpts)
28	return &ServerMetrics{
29		serverStartedCounter: prom.NewCounterVec(
30			opts.apply(prom.CounterOpts{
31				Name: "grpc_server_started_total",
32				Help: "Total number of RPCs started on the server.",
33			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
34		serverHandledCounter: prom.NewCounterVec(
35			opts.apply(prom.CounterOpts{
36				Name: "grpc_server_handled_total",
37				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
38			}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
39		serverStreamMsgReceived: prom.NewCounterVec(
40			opts.apply(prom.CounterOpts{
41				Name: "grpc_server_msg_received_total",
42				Help: "Total number of RPC stream messages received on the server.",
43			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
44		serverStreamMsgSent: prom.NewCounterVec(
45			opts.apply(prom.CounterOpts{
46				Name: "grpc_server_msg_sent_total",
47				Help: "Total number of gRPC stream messages sent by the server.",
48			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
49		serverHandledHistogramEnabled: false,
50		serverHandledHistogramOpts: prom.HistogramOpts{
51			Name:    "grpc_server_handling_seconds",
52			Help:    "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
53			Buckets: prom.DefBuckets,
54		},
55		serverHandledHistogram: nil,
56	}
57}
58
59// EnableHandlingTimeHistogram enables histograms being registered when
60// registering the ServerMetrics on a Prometheus registry. Histograms can be
61// expensive on Prometheus servers. It takes options to configure histogram
62// options such as the defined buckets.
63func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
64	for _, o := range opts {
65		o(&m.serverHandledHistogramOpts)
66	}
67	if !m.serverHandledHistogramEnabled {
68		m.serverHandledHistogram = prom.NewHistogramVec(
69			m.serverHandledHistogramOpts,
70			[]string{"grpc_type", "grpc_service", "grpc_method"},
71		)
72	}
73	m.serverHandledHistogramEnabled = true
74}
75
76// Describe sends the super-set of all possible descriptors of metrics
77// collected by this Collector to the provided channel and returns once
78// the last descriptor has been sent.
79func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
80	m.serverStartedCounter.Describe(ch)
81	m.serverHandledCounter.Describe(ch)
82	m.serverStreamMsgReceived.Describe(ch)
83	m.serverStreamMsgSent.Describe(ch)
84	if m.serverHandledHistogramEnabled {
85		m.serverHandledHistogram.Describe(ch)
86	}
87}
88
89// Collect is called by the Prometheus registry when collecting
90// metrics. The implementation sends each collected metric via the
91// provided channel and returns once the last metric has been sent.
92func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
93	m.serverStartedCounter.Collect(ch)
94	m.serverHandledCounter.Collect(ch)
95	m.serverStreamMsgReceived.Collect(ch)
96	m.serverStreamMsgSent.Collect(ch)
97	if m.serverHandledHistogramEnabled {
98		m.serverHandledHistogram.Collect(ch)
99	}
100}
101
102// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
103func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
104	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
105		monitor := newServerReporter(m, Unary, info.FullMethod)
106		monitor.ReceivedMessage()
107		resp, err := handler(ctx, req)
108		st, _ := status.FromError(err)
109		monitor.Handled(st.Code())
110		if err == nil {
111			monitor.SentMessage()
112		}
113		return resp, err
114	}
115}
116
117// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
118func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
119	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
120		monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
121		err := handler(srv, &monitoredServerStream{ss, monitor})
122		st, _ := status.FromError(err)
123		monitor.Handled(st.Code())
124		return err
125	}
126}
127
128// InitializeMetrics initializes all metrics, with their appropriate null
129// value, for all gRPC methods registered on a gRPC server. This is useful, to
130// ensure that all metrics exist when collecting and querying.
131func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
132	serviceInfo := server.GetServiceInfo()
133	for serviceName, info := range serviceInfo {
134		for _, mInfo := range info.Methods {
135			preRegisterMethod(m, serviceName, &mInfo)
136		}
137	}
138}
139
140func streamRPCType(info *grpc.StreamServerInfo) grpcType {
141	if info.IsClientStream && !info.IsServerStream {
142		return ClientStream
143	} else if !info.IsClientStream && info.IsServerStream {
144		return ServerStream
145	}
146	return BidiStream
147}
148
149// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
150type monitoredServerStream struct {
151	grpc.ServerStream
152	monitor *serverReporter
153}
154
155func (s *monitoredServerStream) SendMsg(m interface{}) error {
156	err := s.ServerStream.SendMsg(m)
157	if err == nil {
158		s.monitor.SentMessage()
159	}
160	return err
161}
162
163func (s *monitoredServerStream) RecvMsg(m interface{}) error {
164	err := s.ServerStream.RecvMsg(m)
165	if err == nil {
166		s.monitor.ReceivedMessage()
167	}
168	return err
169}
170
171// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
172func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
173	methodName := mInfo.Name
174	methodType := string(typeFromMethodInfo(mInfo))
175	// These are just references (no increments), as just referencing will create the labels but not set values.
176	metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
177	metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
178	metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
179	if metrics.serverHandledHistogramEnabled {
180		metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
181	}
182	for _, code := range allCodes {
183		metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
184	}
185}
186