1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package ocgrpc
17
18import (
19	"context"
20	"reflect"
21	"testing"
22
23	"google.golang.org/grpc/codes"
24	"google.golang.org/grpc/stats"
25	"google.golang.org/grpc/status"
26
27	"go.opencensus.io/metric/metricdata"
28	"go.opencensus.io/stats/view"
29	"go.opencensus.io/tag"
30	"go.opencensus.io/trace"
31)
32
33func TestServerDefaultCollections(t *testing.T) {
34	k1 := tag.MustNewKey("k1")
35	k2 := tag.MustNewKey("k2")
36
37	type tagPair struct {
38		k tag.Key
39		v string
40	}
41
42	type wantData struct {
43		v    func() *view.View
44		rows []*view.Row
45	}
46	type rpc struct {
47		tags        []tagPair
48		tagInfo     *stats.RPCTagInfo
49		inPayloads  []*stats.InPayload
50		outPayloads []*stats.OutPayload
51		end         *stats.End
52	}
53
54	type testCase struct {
55		label string
56		rpcs  []*rpc
57		wants []*wantData
58	}
59
60	tcs := []testCase{
61		{
62			"1",
63			[]*rpc{
64				{
65					[]tagPair{{k1, "v1"}},
66					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
67					[]*stats.InPayload{
68						{Length: 10},
69					},
70					[]*stats.OutPayload{
71						{Length: 10},
72					},
73					&stats.End{Error: nil},
74				},
75			},
76			[]*wantData{
77				{
78					func() *view.View { return ServerReceivedMessagesPerRPCView },
79					[]*view.Row{
80						{
81							Tags: []tag.Tag{
82								{Key: KeyServerMethod, Value: "package.service/method"},
83							},
84							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
85						},
86					},
87				},
88				{
89					func() *view.View { return ServerSentMessagesPerRPCView },
90					[]*view.Row{
91						{
92							Tags: []tag.Tag{
93								{Key: KeyServerMethod, Value: "package.service/method"},
94							},
95							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
96						},
97					},
98				},
99				{
100					func() *view.View { return ServerReceivedBytesPerRPCView },
101					[]*view.Row{
102						{
103							Tags: []tag.Tag{
104								{Key: KeyServerMethod, Value: "package.service/method"},
105							},
106							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
107						},
108					},
109				},
110				{
111					func() *view.View { return ServerSentBytesPerRPCView },
112					[]*view.Row{
113						{
114							Tags: []tag.Tag{
115								{Key: KeyServerMethod, Value: "package.service/method"},
116							},
117							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
118						},
119					},
120				},
121			},
122		},
123		{
124			"2",
125			[]*rpc{
126				{
127					[]tagPair{{k1, "v1"}},
128					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
129					[]*stats.InPayload{
130						{Length: 10},
131					},
132					[]*stats.OutPayload{
133						{Length: 10},
134						{Length: 10},
135						{Length: 10},
136					},
137					&stats.End{Error: nil},
138				},
139				{
140					[]tagPair{{k1, "v11"}},
141					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
142					[]*stats.InPayload{
143						{Length: 10},
144						{Length: 10},
145					},
146					[]*stats.OutPayload{
147						{Length: 10},
148						{Length: 10},
149					},
150					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
151				},
152			},
153			[]*wantData{
154				{
155					func() *view.View { return ServerReceivedMessagesPerRPCView },
156					[]*view.Row{
157						{
158							Tags: []tag.Tag{
159								{Key: KeyServerMethod, Value: "package.service/method"},
160							},
161							Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
162						},
163					},
164				},
165				{
166					func() *view.View { return ServerSentMessagesPerRPCView },
167					[]*view.Row{
168						{
169							Tags: []tag.Tag{
170								{Key: KeyServerMethod, Value: "package.service/method"},
171							},
172							Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
173						},
174					},
175				},
176			},
177		},
178		{
179			"3",
180			[]*rpc{
181				{
182					[]tagPair{{k1, "v1"}},
183					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
184					[]*stats.InPayload{
185						{Length: 1},
186					},
187					[]*stats.OutPayload{
188						{Length: 1},
189						{Length: 1024},
190						{Length: 65536},
191					},
192					&stats.End{Error: nil},
193				},
194				{
195					[]tagPair{{k1, "v1"}, {k2, "v2"}},
196					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
197					[]*stats.InPayload{
198						{Length: 1024},
199					},
200					[]*stats.OutPayload{
201						{Length: 4096},
202						{Length: 16384},
203					},
204					&stats.End{Error: status.Error(codes.Aborted, "aborted")},
205				},
206				{
207					[]tagPair{{k1, "v11"}, {k2, "v22"}},
208					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
209					[]*stats.InPayload{
210						{Length: 2048},
211						{Length: 16384},
212					},
213					[]*stats.OutPayload{
214						{Length: 2048},
215						{Length: 4096},
216						{Length: 16384},
217					},
218					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
219				},
220			},
221			[]*wantData{
222				{
223					func() *view.View { return ServerReceivedMessagesPerRPCView },
224					[]*view.Row{
225						{
226							Tags: []tag.Tag{
227								{Key: KeyServerMethod, Value: "package.service/method"},
228							},
229							Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
230						},
231					},
232				},
233				{
234					func() *view.View { return ServerSentMessagesPerRPCView },
235					[]*view.Row{
236						{
237							Tags: []tag.Tag{
238								{Key: KeyServerMethod, Value: "package.service/method"},
239							},
240							Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
241						},
242					},
243				},
244				{
245					func() *view.View { return ServerReceivedBytesPerRPCView },
246					[]*view.Row{
247						{
248							Tags: []tag.Tag{
249								{Key: KeyServerMethod, Value: "package.service/method"},
250							},
251							Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.6666667, 2.1459558466666667e+08),
252						},
253					},
254				},
255				{
256					func() *view.View { return ServerSentBytesPerRPCView },
257					[]*view.Row{
258						{
259							Tags: []tag.Tag{
260								{Key: KeyServerMethod, Value: "package.service/method"},
261							},
262							Data: newDistributionData([]int64{0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
263						},
264					},
265				},
266			},
267		},
268	}
269
270	views := append(DefaultServerViews[:], ServerReceivedMessagesPerRPCView, ServerSentMessagesPerRPCView)
271
272	for _, tc := range tcs {
273		if err := view.Register(views...); err != nil {
274			t.Fatal(err)
275		}
276
277		h := &ServerHandler{}
278		h.StartOptions.Sampler = trace.NeverSample()
279		for _, rpc := range tc.rpcs {
280			mods := []tag.Mutator{}
281			for _, t := range rpc.tags {
282				mods = append(mods, tag.Upsert(t.k, t.v))
283			}
284			ctx, err := tag.New(context.Background(), mods...)
285			if err != nil {
286				t.Errorf("%q: NewMap = %v", tc.label, err)
287			}
288			encoded := tag.Encode(tag.FromContext(ctx))
289			ctx = stats.SetTags(context.Background(), encoded)
290			ctx = h.TagRPC(ctx, rpc.tagInfo)
291
292			for _, in := range rpc.inPayloads {
293				h.HandleRPC(ctx, in)
294			}
295			for _, out := range rpc.outPayloads {
296				h.HandleRPC(ctx, out)
297			}
298			h.HandleRPC(ctx, rpc.end)
299		}
300
301		for _, wantData := range tc.wants {
302			gotRows, err := view.RetrieveData(wantData.v().Name)
303			if err != nil {
304				t.Errorf("%q: RetrieveData (%q) = %v", tc.label, wantData.v().Name, err)
305				continue
306			}
307
308			for i := range gotRows {
309				view.ClearStart(gotRows[i].Data)
310			}
311
312			for _, gotRow := range gotRows {
313				if !containsRow(wantData.rows, gotRow) {
314					t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
315					break
316				}
317			}
318
319			for _, wantRow := range wantData.rows {
320				if !containsRow(gotRows, wantRow) {
321					t.Errorf("%q: missing row for view %q: %v", tc.label, wantData.v().Name, wantRow)
322					break
323				}
324			}
325		}
326
327		// Unregister views to cleanup.
328		view.Unregister(views...)
329	}
330}
331
332func newDistributionData(countPerBucket []int64, count int64, min, max, mean, sumOfSquaredDev float64) *view.DistributionData {
333	return &view.DistributionData{
334		Count:           count,
335		Min:             min,
336		Max:             max,
337		Mean:            mean,
338		SumOfSquaredDev: sumOfSquaredDev,
339		CountPerBucket:  countPerBucket,
340	}
341}
342
343func TestServerRecordExemplar(t *testing.T) {
344	key := tag.MustNewKey("test_key")
345	tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
346	out := &stats.OutPayload{Length: 2000}
347	end := &stats.End{Error: nil}
348
349	if err := view.Register(ServerSentBytesPerRPCView); err != nil {
350		t.Error(err)
351	}
352	h := &ServerHandler{}
353	h.StartOptions.Sampler = trace.AlwaysSample()
354	ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
355	if err != nil {
356		t.Error(err)
357	}
358	encoded := tag.Encode(tag.FromContext(ctx))
359	ctx = stats.SetTags(context.Background(), encoded)
360	ctx = h.TagRPC(ctx, tagInfo)
361
362	out.Client = false
363	h.HandleRPC(ctx, out)
364	end.Client = false
365	h.HandleRPC(ctx, end)
366
367	span := trace.FromContext(ctx)
368	if span == nil {
369		t.Fatal("expected non-nil span, got nil")
370	}
371	if !span.IsRecordingEvents() {
372		t.Errorf("span should be sampled")
373	}
374	attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
375	wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
376
377	rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name)
378	if err != nil {
379		t.Fatal("Error RetrieveData ", err)
380	}
381	if len(rows) == 0 {
382		t.Fatal("No data was recorded.")
383	}
384	data := rows[0].Data
385	dis, ok := data.(*view.DistributionData)
386	if !ok {
387		t.Fatal("want DistributionData, got ", data)
388	}
389	// Only recorded value is 2000, which falls into the second bucket (1024, 2048].
390	wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
391	if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
392		t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
393	}
394	for i, e := range dis.ExemplarsPerBucket {
395		// Only the second bucket should have an exemplar.
396		if i == 1 {
397			if diff := cmpExemplar(e, wantExemplar); diff != "" {
398				t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
399			}
400		} else if e != nil {
401			t.Errorf("want nil exemplar, got %v", e)
402		}
403	}
404
405	// Unregister views to cleanup.
406	view.Unregister(ServerSentBytesPerRPCView)
407}
408