1package grpc_prometheus 2 3import ( 4 prom "github.com/prometheus/client_golang/prometheus" 5 "golang.org/x/net/context" 6 "google.golang.org/grpc" 7 "google.golang.org/grpc/status" 8) 9 10// ServerMetrics represents a collection of metrics to be registered on a 11// Prometheus metrics registry for a gRPC server. 12type ServerMetrics struct { 13 serverStartedCounter *prom.CounterVec 14 serverHandledCounter *prom.CounterVec 15 serverStreamMsgReceived *prom.CounterVec 16 serverStreamMsgSent *prom.CounterVec 17 serverHandledHistogramEnabled bool 18 serverHandledHistogramOpts prom.HistogramOpts 19 serverHandledHistogram *prom.HistogramVec 20} 21 22// NewServerMetrics returns a ServerMetrics object. Use a new instance of 23// ServerMetrics when not using the default Prometheus metrics registry, for 24// example when wanting to control which metrics are added to a registry as 25// opposed to automatically adding metrics via init functions. 26func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics { 27 opts := counterOptions(counterOpts) 28 return &ServerMetrics{ 29 serverStartedCounter: prom.NewCounterVec( 30 opts.apply(prom.CounterOpts{ 31 Name: "grpc_server_started_total", 32 Help: "Total number of RPCs started on the server.", 33 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 34 serverHandledCounter: prom.NewCounterVec( 35 opts.apply(prom.CounterOpts{ 36 Name: "grpc_server_handled_total", 37 Help: "Total number of RPCs completed on the server, regardless of success or failure.", 38 }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), 39 serverStreamMsgReceived: prom.NewCounterVec( 40 opts.apply(prom.CounterOpts{ 41 Name: "grpc_server_msg_received_total", 42 Help: "Total number of RPC stream messages received on the server.", 43 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 44 serverStreamMsgSent: prom.NewCounterVec( 45 opts.apply(prom.CounterOpts{ 46 Name: "grpc_server_msg_sent_total", 47 Help: "Total number of gRPC stream messages sent by the server.", 48 }), []string{"grpc_type", "grpc_service", "grpc_method"}), 49 serverHandledHistogramEnabled: false, 50 serverHandledHistogramOpts: prom.HistogramOpts{ 51 Name: "grpc_server_handling_seconds", 52 Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", 53 Buckets: prom.DefBuckets, 54 }, 55 serverHandledHistogram: nil, 56 } 57} 58 59// EnableHandlingTimeHistogram enables histograms being registered when 60// registering the ServerMetrics on a Prometheus registry. Histograms can be 61// expensive on Prometheus servers. It takes options to configure histogram 62// options such as the defined buckets. 63func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) { 64 for _, o := range opts { 65 o(&m.serverHandledHistogramOpts) 66 } 67 if !m.serverHandledHistogramEnabled { 68 m.serverHandledHistogram = prom.NewHistogramVec( 69 m.serverHandledHistogramOpts, 70 []string{"grpc_type", "grpc_service", "grpc_method"}, 71 ) 72 } 73 m.serverHandledHistogramEnabled = true 74} 75 76// Describe sends the super-set of all possible descriptors of metrics 77// collected by this Collector to the provided channel and returns once 78// the last descriptor has been sent. 79func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) { 80 m.serverStartedCounter.Describe(ch) 81 m.serverHandledCounter.Describe(ch) 82 m.serverStreamMsgReceived.Describe(ch) 83 m.serverStreamMsgSent.Describe(ch) 84 if m.serverHandledHistogramEnabled { 85 m.serverHandledHistogram.Describe(ch) 86 } 87} 88 89// Collect is called by the Prometheus registry when collecting 90// metrics. The implementation sends each collected metric via the 91// provided channel and returns once the last metric has been sent. 92func (m *ServerMetrics) Collect(ch chan<- prom.Metric) { 93 m.serverStartedCounter.Collect(ch) 94 m.serverHandledCounter.Collect(ch) 95 m.serverStreamMsgReceived.Collect(ch) 96 m.serverStreamMsgSent.Collect(ch) 97 if m.serverHandledHistogramEnabled { 98 m.serverHandledHistogram.Collect(ch) 99 } 100} 101 102// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs. 103func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 104 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 105 monitor := newServerReporter(m, Unary, info.FullMethod) 106 monitor.ReceivedMessage() 107 resp, err := handler(ctx, req) 108 st, _ := status.FromError(err) 109 monitor.Handled(st.Code()) 110 if err == nil { 111 monitor.SentMessage() 112 } 113 return resp, err 114 } 115} 116 117// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs. 118func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 119 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 120 monitor := newServerReporter(m, streamRPCType(info), info.FullMethod) 121 err := handler(srv, &monitoredServerStream{ss, monitor}) 122 st, _ := status.FromError(err) 123 monitor.Handled(st.Code()) 124 return err 125 } 126} 127 128// InitializeMetrics initializes all metrics, with their appropriate null 129// value, for all gRPC methods registered on a gRPC server. This is useful, to 130// ensure that all metrics exist when collecting and querying. 131func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) { 132 serviceInfo := server.GetServiceInfo() 133 for serviceName, info := range serviceInfo { 134 for _, mInfo := range info.Methods { 135 preRegisterMethod(m, serviceName, &mInfo) 136 } 137 } 138} 139 140func streamRPCType(info *grpc.StreamServerInfo) grpcType { 141 if info.IsClientStream && !info.IsServerStream { 142 return ClientStream 143 } else if !info.IsClientStream && info.IsServerStream { 144 return ServerStream 145 } 146 return BidiStream 147} 148 149// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters. 150type monitoredServerStream struct { 151 grpc.ServerStream 152 monitor *serverReporter 153} 154 155func (s *monitoredServerStream) SendMsg(m interface{}) error { 156 err := s.ServerStream.SendMsg(m) 157 if err == nil { 158 s.monitor.SentMessage() 159 } 160 return err 161} 162 163func (s *monitoredServerStream) RecvMsg(m interface{}) error { 164 err := s.ServerStream.RecvMsg(m) 165 if err == nil { 166 s.monitor.ReceivedMessage() 167 } 168 return err 169} 170 171// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated. 172func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) { 173 methodName := mInfo.Name 174 methodType := string(typeFromMethodInfo(mInfo)) 175 // These are just references (no increments), as just referencing will create the labels but not set values. 176 metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName) 177 metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName) 178 metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName) 179 if metrics.serverHandledHistogramEnabled { 180 metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName) 181 } 182 for _, code := range allCodes { 183 metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String()) 184 } 185} 186