1package grpc_prometheus 2 3import ( 4 "io" 5 6 prom "github.com/prometheus/client_golang/prometheus" 7 "golang.org/x/net/context" 8 "google.golang.org/grpc" 9 "google.golang.org/grpc/codes" 10 "google.golang.org/grpc/status" 11) 12 13// ClientMetrics represents a collection of metrics to be registered on a 14// Prometheus metrics registry for a gRPC client. 15type ClientMetrics struct { 16 clientStartedCounter *prom.CounterVec 17 clientHandledCounter *prom.CounterVec 18 clientStreamMsgReceived *prom.CounterVec 19 clientStreamMsgSent *prom.CounterVec 20 clientHandledHistogramEnabled bool 21 clientHandledHistogramOpts prom.HistogramOpts 22 clientHandledHistogram *prom.HistogramVec 23} 24 25// NewClientMetrics returns a ClientMetrics object. Use a new instance of 26// ClientMetrics when not using the default Prometheus metrics registry, for 27// example when wanting to control which metrics are added to a registry as 28// opposed to automatically adding metrics via init functions. 29func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics { 30 opts := counterOptions(counterOpts) 31 return &ClientMetrics{ 32 clientStartedCounter: prom.NewCounterVec( 33 opts.apply(prom.CounterOpts{ 34 Name: "grpc_client_started_total", 35 Help: "Total number of RPCs started on the client.", 36 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 37 38 clientHandledCounter: prom.NewCounterVec( 39 opts.apply(prom.CounterOpts{ 40 Name: "grpc_client_handled_total", 41 Help: "Total number of RPCs completed by the client, regardless of success or failure.", 42 }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), 43 44 clientStreamMsgReceived: prom.NewCounterVec( 45 opts.apply(prom.CounterOpts{ 46 Name: "grpc_client_msg_received_total", 47 Help: "Total number of RPC stream messages received by the client.", 48 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 49 50 clientStreamMsgSent: prom.NewCounterVec( 51 opts.apply(prom.CounterOpts{ 52 Name: "grpc_client_msg_sent_total", 53 Help: "Total number of gRPC stream messages sent by the client.", 54 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 55 56 clientHandledHistogramEnabled: false, 57 clientHandledHistogramOpts: prom.HistogramOpts{ 58 Name: "grpc_client_handling_seconds", 59 Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.", 60 Buckets: prom.DefBuckets, 61 }, 62 clientHandledHistogram: nil, 63 } 64} 65 66// Describe sends the super-set of all possible descriptors of metrics 67// collected by this Collector to the provided channel and returns once 68// the last descriptor has been sent. 69func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) { 70 m.clientStartedCounter.Describe(ch) 71 m.clientHandledCounter.Describe(ch) 72 m.clientStreamMsgReceived.Describe(ch) 73 m.clientStreamMsgSent.Describe(ch) 74 if m.clientHandledHistogramEnabled { 75 m.clientHandledHistogram.Describe(ch) 76 } 77} 78 79// Collect is called by the Prometheus registry when collecting 80// metrics. The implementation sends each collected metric via the 81// provided channel and returns once the last metric has been sent. 82func (m *ClientMetrics) Collect(ch chan<- prom.Metric) { 83 m.clientStartedCounter.Collect(ch) 84 m.clientHandledCounter.Collect(ch) 85 m.clientStreamMsgReceived.Collect(ch) 86 m.clientStreamMsgSent.Collect(ch) 87 if m.clientHandledHistogramEnabled { 88 m.clientHandledHistogram.Collect(ch) 89 } 90} 91 92// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs. 93// Histogram metrics can be very expensive for Prometheus to retain and query. 94func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) { 95 for _, o := range opts { 96 o(&m.clientHandledHistogramOpts) 97 } 98 if !m.clientHandledHistogramEnabled { 99 m.clientHandledHistogram = prom.NewHistogramVec( 100 m.clientHandledHistogramOpts, 101 []string{"grpc_type", "grpc_service", "grpc_method"}, 102 ) 103 } 104 m.clientHandledHistogramEnabled = true 105} 106 107// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs. 108func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 109 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 110 monitor := newClientReporter(m, Unary, method) 111 monitor.SentMessage() 112 err := invoker(ctx, method, req, reply, cc, opts...) 113 if err != nil { 114 monitor.ReceivedMessage() 115 } 116 st, _ := status.FromError(err) 117 monitor.Handled(st.Code()) 118 return err 119 } 120} 121 122// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. 123func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 124 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 125 monitor := newClientReporter(m, clientStreamType(desc), method) 126 clientStream, err := streamer(ctx, desc, cc, method, opts...) 127 if err != nil { 128 st, _ := status.FromError(err) 129 monitor.Handled(st.Code()) 130 return nil, err 131 } 132 return &monitoredClientStream{clientStream, monitor}, nil 133 } 134} 135 136func clientStreamType(desc *grpc.StreamDesc) grpcType { 137 if desc.ClientStreams && !desc.ServerStreams { 138 return ClientStream 139 } else if !desc.ClientStreams && desc.ServerStreams { 140 return ServerStream 141 } 142 return BidiStream 143} 144 145// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters. 146type monitoredClientStream struct { 147 grpc.ClientStream 148 monitor *clientReporter 149} 150 151func (s *monitoredClientStream) SendMsg(m interface{}) error { 152 err := s.ClientStream.SendMsg(m) 153 if err == nil { 154 s.monitor.SentMessage() 155 } 156 return err 157} 158 159func (s *monitoredClientStream) RecvMsg(m interface{}) error { 160 err := s.ClientStream.RecvMsg(m) 161 if err == nil { 162 s.monitor.ReceivedMessage() 163 } else if err == io.EOF { 164 s.monitor.Handled(codes.OK) 165 } else { 166 st, _ := status.FromError(err) 167 s.monitor.Handled(st.Code()) 168 } 169 return err 170} 171