1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package ocgrpc
17
18import (
19	"context"
20	"strconv"
21	"strings"
22	"sync/atomic"
23	"time"
24
25	"go.opencensus.io/metric/metricdata"
26	ocstats "go.opencensus.io/stats"
27	"go.opencensus.io/stats/view"
28	"go.opencensus.io/tag"
29	"go.opencensus.io/trace"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/grpclog"
32	"google.golang.org/grpc/stats"
33	"google.golang.org/grpc/status"
34)
35
36type grpcInstrumentationKey string
37
38// rpcData holds the instrumentation RPC data that is needed between the start
39// and end of an call. It holds the info that this package needs to keep track
40// of between the various GRPC events.
41type rpcData struct {
42	// reqCount and respCount has to be the first words
43	// in order to be 64-aligned on 32-bit architectures.
44	sentCount, sentBytes, recvCount, recvBytes int64 // access atomically
45
46	// startTime represents the time at which TagRPC was invoked at the
47	// beginning of an RPC. It is an appoximation of the time when the
48	// application code invoked GRPC code.
49	startTime time.Time
50	method    string
51}
52
53// The following variables define the default hard-coded auxiliary data used by
54// both the default GRPC client and GRPC server metrics.
55var (
56	DefaultBytesDistribution        = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
57	DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
58	DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
59)
60
61// Server tags are applied to the context used to process each RPC, as well as
62// the measures at the end of each RPC.
63var (
64	KeyServerMethod, _ = tag.NewKey("grpc_server_method")
65	KeyServerStatus, _ = tag.NewKey("grpc_server_status")
66)
67
68// Client tags are applied to measures at the end of each RPC.
69var (
70	KeyClientMethod, _ = tag.NewKey("grpc_client_method")
71	KeyClientStatus, _ = tag.NewKey("grpc_client_status")
72)
73
74var (
75	rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
76)
77
78func methodName(fullname string) string {
79	return strings.TrimLeft(fullname, "/")
80}
81
82// statsHandleRPC processes the RPC events.
83func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
84	switch st := s.(type) {
85	case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
86		// do nothing for client
87	case *stats.OutPayload:
88		handleRPCOutPayload(ctx, st)
89	case *stats.InPayload:
90		handleRPCInPayload(ctx, st)
91	case *stats.End:
92		handleRPCEnd(ctx, st)
93	default:
94		grpclog.Infof("unexpected stats: %T", st)
95	}
96}
97
98func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
99	d, ok := ctx.Value(rpcDataKey).(*rpcData)
100	if !ok {
101		if grpclog.V(2) {
102			grpclog.Infoln("Failed to retrieve *rpcData from context.")
103		}
104		return
105	}
106
107	atomic.AddInt64(&d.sentBytes, int64(s.Length))
108	atomic.AddInt64(&d.sentCount, 1)
109}
110
111func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
112	d, ok := ctx.Value(rpcDataKey).(*rpcData)
113	if !ok {
114		if grpclog.V(2) {
115			grpclog.Infoln("Failed to retrieve *rpcData from context.")
116		}
117		return
118	}
119
120	atomic.AddInt64(&d.recvBytes, int64(s.Length))
121	atomic.AddInt64(&d.recvCount, 1)
122}
123
124func handleRPCEnd(ctx context.Context, s *stats.End) {
125	d, ok := ctx.Value(rpcDataKey).(*rpcData)
126	if !ok {
127		if grpclog.V(2) {
128			grpclog.Infoln("Failed to retrieve *rpcData from context.")
129		}
130		return
131	}
132
133	elapsedTime := time.Since(d.startTime)
134
135	var st string
136	if s.Error != nil {
137		s, ok := status.FromError(s.Error)
138		if ok {
139			st = statusCodeToString(s)
140		}
141	} else {
142		st = "OK"
143	}
144
145	latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
146	attachments := getSpanCtxAttachment(ctx)
147	if s.Client {
148		ocstats.RecordWithOptions(ctx,
149			ocstats.WithTags(
150				tag.Upsert(KeyClientMethod, methodName(d.method)),
151				tag.Upsert(KeyClientStatus, st)),
152			ocstats.WithAttachments(attachments),
153			ocstats.WithMeasurements(
154				ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
155				ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
156				ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
157				ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
158				ClientRoundtripLatency.M(latencyMillis)))
159	} else {
160		ocstats.RecordWithOptions(ctx,
161			ocstats.WithTags(
162				tag.Upsert(KeyServerStatus, st),
163			),
164			ocstats.WithAttachments(attachments),
165			ocstats.WithMeasurements(
166				ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
167				ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
168				ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
169				ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
170				ServerLatency.M(latencyMillis)))
171	}
172}
173
174func statusCodeToString(s *status.Status) string {
175	// see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
176	switch c := s.Code(); c {
177	case codes.OK:
178		return "OK"
179	case codes.Canceled:
180		return "CANCELLED"
181	case codes.Unknown:
182		return "UNKNOWN"
183	case codes.InvalidArgument:
184		return "INVALID_ARGUMENT"
185	case codes.DeadlineExceeded:
186		return "DEADLINE_EXCEEDED"
187	case codes.NotFound:
188		return "NOT_FOUND"
189	case codes.AlreadyExists:
190		return "ALREADY_EXISTS"
191	case codes.PermissionDenied:
192		return "PERMISSION_DENIED"
193	case codes.ResourceExhausted:
194		return "RESOURCE_EXHAUSTED"
195	case codes.FailedPrecondition:
196		return "FAILED_PRECONDITION"
197	case codes.Aborted:
198		return "ABORTED"
199	case codes.OutOfRange:
200		return "OUT_OF_RANGE"
201	case codes.Unimplemented:
202		return "UNIMPLEMENTED"
203	case codes.Internal:
204		return "INTERNAL"
205	case codes.Unavailable:
206		return "UNAVAILABLE"
207	case codes.DataLoss:
208		return "DATA_LOSS"
209	case codes.Unauthenticated:
210		return "UNAUTHENTICATED"
211	default:
212		return "CODE_" + strconv.FormatInt(int64(c), 10)
213	}
214}
215
216func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
217	attachments := map[string]interface{}{}
218	span := trace.FromContext(ctx)
219	if span == nil {
220		return attachments
221	}
222	spanCtx := span.SpanContext()
223	if spanCtx.IsSampled() {
224		attachments[metricdata.AttachmentKeySpanContext] = spanCtx
225	}
226	return attachments
227}
228