1// Copyright 2017, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15 16package ocgrpc 17 18import ( 19 "context" 20 "strconv" 21 "strings" 22 "sync/atomic" 23 "time" 24 25 "go.opencensus.io/metric/metricdata" 26 ocstats "go.opencensus.io/stats" 27 "go.opencensus.io/stats/view" 28 "go.opencensus.io/tag" 29 "go.opencensus.io/trace" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/grpclog" 32 "google.golang.org/grpc/stats" 33 "google.golang.org/grpc/status" 34) 35 36type grpcInstrumentationKey string 37 38// rpcData holds the instrumentation RPC data that is needed between the start 39// and end of an call. It holds the info that this package needs to keep track 40// of between the various GRPC events. 41type rpcData struct { 42 // reqCount and respCount has to be the first words 43 // in order to be 64-aligned on 32-bit architectures. 44 sentCount, sentBytes, recvCount, recvBytes int64 // access atomically 45 46 // startTime represents the time at which TagRPC was invoked at the 47 // beginning of an RPC. It is an appoximation of the time when the 48 // application code invoked GRPC code. 49 startTime time.Time 50 method string 51} 52 53// The following variables define the default hard-coded auxiliary data used by 54// both the default GRPC client and GRPC server metrics. 55var ( 56 DefaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296) 57 DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) 58 DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) 59) 60 61// Server tags are applied to the context used to process each RPC, as well as 62// the measures at the end of each RPC. 63var ( 64 KeyServerMethod, _ = tag.NewKey("grpc_server_method") 65 KeyServerStatus, _ = tag.NewKey("grpc_server_status") 66) 67 68// Client tags are applied to measures at the end of each RPC. 69var ( 70 KeyClientMethod, _ = tag.NewKey("grpc_client_method") 71 KeyClientStatus, _ = tag.NewKey("grpc_client_status") 72) 73 74var ( 75 rpcDataKey = grpcInstrumentationKey("opencensus-rpcData") 76) 77 78func methodName(fullname string) string { 79 return strings.TrimLeft(fullname, "/") 80} 81 82// statsHandleRPC processes the RPC events. 83func statsHandleRPC(ctx context.Context, s stats.RPCStats) { 84 switch st := s.(type) { 85 case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer: 86 // do nothing for client 87 case *stats.OutPayload: 88 handleRPCOutPayload(ctx, st) 89 case *stats.InPayload: 90 handleRPCInPayload(ctx, st) 91 case *stats.End: 92 handleRPCEnd(ctx, st) 93 default: 94 grpclog.Infof("unexpected stats: %T", st) 95 } 96} 97 98func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) { 99 d, ok := ctx.Value(rpcDataKey).(*rpcData) 100 if !ok { 101 if grpclog.V(2) { 102 grpclog.Infoln("Failed to retrieve *rpcData from context.") 103 } 104 return 105 } 106 107 atomic.AddInt64(&d.sentBytes, int64(s.Length)) 108 atomic.AddInt64(&d.sentCount, 1) 109} 110 111func handleRPCInPayload(ctx context.Context, s *stats.InPayload) { 112 d, ok := ctx.Value(rpcDataKey).(*rpcData) 113 if !ok { 114 if grpclog.V(2) { 115 grpclog.Infoln("Failed to retrieve *rpcData from context.") 116 } 117 return 118 } 119 120 atomic.AddInt64(&d.recvBytes, int64(s.Length)) 121 atomic.AddInt64(&d.recvCount, 1) 122} 123 124func handleRPCEnd(ctx context.Context, s *stats.End) { 125 d, ok := ctx.Value(rpcDataKey).(*rpcData) 126 if !ok { 127 if grpclog.V(2) { 128 grpclog.Infoln("Failed to retrieve *rpcData from context.") 129 } 130 return 131 } 132 133 elapsedTime := time.Since(d.startTime) 134 135 var st string 136 if s.Error != nil { 137 s, ok := status.FromError(s.Error) 138 if ok { 139 st = statusCodeToString(s) 140 } 141 } else { 142 st = "OK" 143 } 144 145 latencyMillis := float64(elapsedTime) / float64(time.Millisecond) 146 attachments := getSpanCtxAttachment(ctx) 147 if s.Client { 148 ocstats.RecordWithOptions(ctx, 149 ocstats.WithTags( 150 tag.Upsert(KeyClientMethod, methodName(d.method)), 151 tag.Upsert(KeyClientStatus, st)), 152 ocstats.WithAttachments(attachments), 153 ocstats.WithMeasurements( 154 ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)), 155 ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)), 156 ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)), 157 ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)), 158 ClientRoundtripLatency.M(latencyMillis))) 159 } else { 160 ocstats.RecordWithOptions(ctx, 161 ocstats.WithTags( 162 tag.Upsert(KeyServerStatus, st), 163 ), 164 ocstats.WithAttachments(attachments), 165 ocstats.WithMeasurements( 166 ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)), 167 ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)), 168 ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)), 169 ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)), 170 ServerLatency.M(latencyMillis))) 171 } 172} 173 174func statusCodeToString(s *status.Status) string { 175 // see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md 176 switch c := s.Code(); c { 177 case codes.OK: 178 return "OK" 179 case codes.Canceled: 180 return "CANCELLED" 181 case codes.Unknown: 182 return "UNKNOWN" 183 case codes.InvalidArgument: 184 return "INVALID_ARGUMENT" 185 case codes.DeadlineExceeded: 186 return "DEADLINE_EXCEEDED" 187 case codes.NotFound: 188 return "NOT_FOUND" 189 case codes.AlreadyExists: 190 return "ALREADY_EXISTS" 191 case codes.PermissionDenied: 192 return "PERMISSION_DENIED" 193 case codes.ResourceExhausted: 194 return "RESOURCE_EXHAUSTED" 195 case codes.FailedPrecondition: 196 return "FAILED_PRECONDITION" 197 case codes.Aborted: 198 return "ABORTED" 199 case codes.OutOfRange: 200 return "OUT_OF_RANGE" 201 case codes.Unimplemented: 202 return "UNIMPLEMENTED" 203 case codes.Internal: 204 return "INTERNAL" 205 case codes.Unavailable: 206 return "UNAVAILABLE" 207 case codes.DataLoss: 208 return "DATA_LOSS" 209 case codes.Unauthenticated: 210 return "UNAUTHENTICATED" 211 default: 212 return "CODE_" + strconv.FormatInt(int64(c), 10) 213 } 214} 215 216func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments { 217 attachments := map[string]interface{}{} 218 span := trace.FromContext(ctx) 219 if span == nil { 220 return attachments 221 } 222 spanCtx := span.SpanContext() 223 if spanCtx.IsSampled() { 224 attachments[metricdata.AttachmentKeySpanContext] = spanCtx 225 } 226 return attachments 227} 228