1package grpc_prometheus
2
3import (
4	"io"
5
6	prom "github.com/prometheus/client_golang/prometheus"
7	"golang.org/x/net/context"
8	"google.golang.org/grpc"
9	"google.golang.org/grpc/codes"
10	"google.golang.org/grpc/status"
11)
12
13// ClientMetrics represents a collection of metrics to be registered on a
14// Prometheus metrics registry for a gRPC client.
15type ClientMetrics struct {
16	clientStartedCounter          *prom.CounterVec
17	clientHandledCounter          *prom.CounterVec
18	clientStreamMsgReceived       *prom.CounterVec
19	clientStreamMsgSent           *prom.CounterVec
20	clientHandledHistogramEnabled bool
21	clientHandledHistogramOpts    prom.HistogramOpts
22	clientHandledHistogram        *prom.HistogramVec
23}
24
25// NewClientMetrics returns a ClientMetrics object. Use a new instance of
26// ClientMetrics when not using the default Prometheus metrics registry, for
27// example when wanting to control which metrics are added to a registry as
28// opposed to automatically adding metrics via init functions.
29func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
30	opts := counterOptions(counterOpts)
31	return &ClientMetrics{
32		clientStartedCounter: prom.NewCounterVec(
33			opts.apply(prom.CounterOpts{
34				Name: "grpc_client_started_total",
35				Help: "Total number of RPCs started on the client.",
36			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
37
38		clientHandledCounter: prom.NewCounterVec(
39			opts.apply(prom.CounterOpts{
40				Name: "grpc_client_handled_total",
41				Help: "Total number of RPCs completed by the client, regardless of success or failure.",
42			}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
43
44		clientStreamMsgReceived: prom.NewCounterVec(
45			opts.apply(prom.CounterOpts{
46				Name: "grpc_client_msg_received_total",
47				Help: "Total number of RPC stream messages received by the client.",
48			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
49
50		clientStreamMsgSent: prom.NewCounterVec(
51			opts.apply(prom.CounterOpts{
52				Name: "grpc_client_msg_sent_total",
53				Help: "Total number of gRPC stream messages sent by the client.",
54			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
55
56		clientHandledHistogramEnabled: false,
57		clientHandledHistogramOpts: prom.HistogramOpts{
58			Name:    "grpc_client_handling_seconds",
59			Help:    "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
60			Buckets: prom.DefBuckets,
61		},
62		clientHandledHistogram: nil,
63	}
64}
65
66// Describe sends the super-set of all possible descriptors of metrics
67// collected by this Collector to the provided channel and returns once
68// the last descriptor has been sent.
69func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
70	m.clientStartedCounter.Describe(ch)
71	m.clientHandledCounter.Describe(ch)
72	m.clientStreamMsgReceived.Describe(ch)
73	m.clientStreamMsgSent.Describe(ch)
74	if m.clientHandledHistogramEnabled {
75		m.clientHandledHistogram.Describe(ch)
76	}
77}
78
79// Collect is called by the Prometheus registry when collecting
80// metrics. The implementation sends each collected metric via the
81// provided channel and returns once the last metric has been sent.
82func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
83	m.clientStartedCounter.Collect(ch)
84	m.clientHandledCounter.Collect(ch)
85	m.clientStreamMsgReceived.Collect(ch)
86	m.clientStreamMsgSent.Collect(ch)
87	if m.clientHandledHistogramEnabled {
88		m.clientHandledHistogram.Collect(ch)
89	}
90}
91
92// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
93// Histogram metrics can be very expensive for Prometheus to retain and query.
94func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
95	for _, o := range opts {
96		o(&m.clientHandledHistogramOpts)
97	}
98	if !m.clientHandledHistogramEnabled {
99		m.clientHandledHistogram = prom.NewHistogramVec(
100			m.clientHandledHistogramOpts,
101			[]string{"grpc_type", "grpc_service", "grpc_method"},
102		)
103	}
104	m.clientHandledHistogramEnabled = true
105}
106
107// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
108func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
109	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
110		monitor := newClientReporter(m, Unary, method)
111		monitor.SentMessage()
112		err := invoker(ctx, method, req, reply, cc, opts...)
113		if err != nil {
114			monitor.ReceivedMessage()
115		}
116		st, _ := status.FromError(err)
117		monitor.Handled(st.Code())
118		return err
119	}
120}
121
122// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
123func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
124	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
125		monitor := newClientReporter(m, clientStreamType(desc), method)
126		clientStream, err := streamer(ctx, desc, cc, method, opts...)
127		if err != nil {
128			st, _ := status.FromError(err)
129			monitor.Handled(st.Code())
130			return nil, err
131		}
132		return &monitoredClientStream{clientStream, monitor}, nil
133	}
134}
135
136func clientStreamType(desc *grpc.StreamDesc) grpcType {
137	if desc.ClientStreams && !desc.ServerStreams {
138		return ClientStream
139	} else if !desc.ClientStreams && desc.ServerStreams {
140		return ServerStream
141	}
142	return BidiStream
143}
144
145// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
146type monitoredClientStream struct {
147	grpc.ClientStream
148	monitor *clientReporter
149}
150
151func (s *monitoredClientStream) SendMsg(m interface{}) error {
152	err := s.ClientStream.SendMsg(m)
153	if err == nil {
154		s.monitor.SentMessage()
155	}
156	return err
157}
158
159func (s *monitoredClientStream) RecvMsg(m interface{}) error {
160	err := s.ClientStream.RecvMsg(m)
161	if err == nil {
162		s.monitor.ReceivedMessage()
163	} else if err == io.EOF {
164		s.monitor.Handled(codes.OK)
165	} else {
166		st, _ := status.FromError(err)
167		s.monitor.Handled(st.Code())
168	}
169	return err
170}
171