1// Copyright 2017, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15 16package ocgrpc 17 18import ( 19 "context" 20 "reflect" 21 "testing" 22 23 "google.golang.org/grpc/codes" 24 "google.golang.org/grpc/stats" 25 "google.golang.org/grpc/status" 26 27 "go.opencensus.io/metric/metricdata" 28 "go.opencensus.io/stats/view" 29 "go.opencensus.io/tag" 30 "go.opencensus.io/trace" 31) 32 33func TestServerDefaultCollections(t *testing.T) { 34 k1 := tag.MustNewKey("k1") 35 k2 := tag.MustNewKey("k2") 36 37 type tagPair struct { 38 k tag.Key 39 v string 40 } 41 42 type wantData struct { 43 v func() *view.View 44 rows []*view.Row 45 } 46 type rpc struct { 47 tags []tagPair 48 tagInfo *stats.RPCTagInfo 49 inPayloads []*stats.InPayload 50 outPayloads []*stats.OutPayload 51 end *stats.End 52 } 53 54 type testCase struct { 55 label string 56 rpcs []*rpc 57 wants []*wantData 58 } 59 60 tcs := []testCase{ 61 { 62 "1", 63 []*rpc{ 64 { 65 []tagPair{{k1, "v1"}}, 66 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 67 []*stats.InPayload{ 68 {Length: 10}, 69 }, 70 []*stats.OutPayload{ 71 {Length: 10}, 72 }, 73 &stats.End{Error: nil}, 74 }, 75 }, 76 []*wantData{ 77 { 78 func() *view.View { return ServerReceivedMessagesPerRPCView }, 79 []*view.Row{ 80 { 81 Tags: []tag.Tag{ 82 {Key: KeyServerMethod, Value: "package.service/method"}, 83 }, 84 Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0), 85 }, 86 }, 87 }, 88 { 89 func() *view.View { return ServerSentMessagesPerRPCView }, 90 []*view.Row{ 91 { 92 Tags: []tag.Tag{ 93 {Key: KeyServerMethod, Value: "package.service/method"}, 94 }, 95 Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0), 96 }, 97 }, 98 }, 99 { 100 func() *view.View { return ServerReceivedBytesPerRPCView }, 101 []*view.Row{ 102 { 103 Tags: []tag.Tag{ 104 {Key: KeyServerMethod, Value: "package.service/method"}, 105 }, 106 Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0), 107 }, 108 }, 109 }, 110 { 111 func() *view.View { return ServerSentBytesPerRPCView }, 112 []*view.Row{ 113 { 114 Tags: []tag.Tag{ 115 {Key: KeyServerMethod, Value: "package.service/method"}, 116 }, 117 Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0), 118 }, 119 }, 120 }, 121 }, 122 }, 123 { 124 "2", 125 []*rpc{ 126 { 127 []tagPair{{k1, "v1"}}, 128 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 129 []*stats.InPayload{ 130 {Length: 10}, 131 }, 132 []*stats.OutPayload{ 133 {Length: 10}, 134 {Length: 10}, 135 {Length: 10}, 136 }, 137 &stats.End{Error: nil}, 138 }, 139 { 140 []tagPair{{k1, "v11"}}, 141 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 142 []*stats.InPayload{ 143 {Length: 10}, 144 {Length: 10}, 145 }, 146 []*stats.OutPayload{ 147 {Length: 10}, 148 {Length: 10}, 149 }, 150 &stats.End{Error: status.Error(codes.Canceled, "canceled")}, 151 }, 152 }, 153 []*wantData{ 154 { 155 func() *view.View { return ServerReceivedMessagesPerRPCView }, 156 []*view.Row{ 157 { 158 Tags: []tag.Tag{ 159 {Key: KeyServerMethod, Value: "package.service/method"}, 160 }, 161 Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5), 162 }, 163 }, 164 }, 165 { 166 func() *view.View { return ServerSentMessagesPerRPCView }, 167 []*view.Row{ 168 { 169 Tags: []tag.Tag{ 170 {Key: KeyServerMethod, Value: "package.service/method"}, 171 }, 172 Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5), 173 }, 174 }, 175 }, 176 }, 177 }, 178 { 179 "3", 180 []*rpc{ 181 { 182 []tagPair{{k1, "v1"}}, 183 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 184 []*stats.InPayload{ 185 {Length: 1}, 186 }, 187 []*stats.OutPayload{ 188 {Length: 1}, 189 {Length: 1024}, 190 {Length: 65536}, 191 }, 192 &stats.End{Error: nil}, 193 }, 194 { 195 []tagPair{{k1, "v1"}, {k2, "v2"}}, 196 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 197 []*stats.InPayload{ 198 {Length: 1024}, 199 }, 200 []*stats.OutPayload{ 201 {Length: 4096}, 202 {Length: 16384}, 203 }, 204 &stats.End{Error: status.Error(codes.Aborted, "aborted")}, 205 }, 206 { 207 []tagPair{{k1, "v11"}, {k2, "v22"}}, 208 &stats.RPCTagInfo{FullMethodName: "/package.service/method"}, 209 []*stats.InPayload{ 210 {Length: 2048}, 211 {Length: 16384}, 212 }, 213 []*stats.OutPayload{ 214 {Length: 2048}, 215 {Length: 4096}, 216 {Length: 16384}, 217 }, 218 &stats.End{Error: status.Error(codes.Canceled, "canceled")}, 219 }, 220 }, 221 []*wantData{ 222 { 223 func() *view.View { return ServerReceivedMessagesPerRPCView }, 224 []*view.Row{ 225 { 226 Tags: []tag.Tag{ 227 {Key: KeyServerMethod, Value: "package.service/method"}, 228 }, 229 Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2), 230 }, 231 }, 232 }, 233 { 234 func() *view.View { return ServerSentMessagesPerRPCView }, 235 []*view.Row{ 236 { 237 Tags: []tag.Tag{ 238 {Key: KeyServerMethod, Value: "package.service/method"}, 239 }, 240 Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2), 241 }, 242 }, 243 }, 244 { 245 func() *view.View { return ServerReceivedBytesPerRPCView }, 246 []*view.Row{ 247 { 248 Tags: []tag.Tag{ 249 {Key: KeyServerMethod, Value: "package.service/method"}, 250 }, 251 Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.6666667, 2.1459558466666667e+08), 252 }, 253 }, 254 }, 255 { 256 func() *view.View { return ServerSentBytesPerRPCView }, 257 []*view.Row{ 258 { 259 Tags: []tag.Tag{ 260 {Key: KeyServerMethod, Value: "package.service/method"}, 261 }, 262 Data: newDistributionData([]int64{0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09), 263 }, 264 }, 265 }, 266 }, 267 }, 268 } 269 270 views := append(DefaultServerViews[:], ServerReceivedMessagesPerRPCView, ServerSentMessagesPerRPCView) 271 272 for _, tc := range tcs { 273 if err := view.Register(views...); err != nil { 274 t.Fatal(err) 275 } 276 277 h := &ServerHandler{} 278 h.StartOptions.Sampler = trace.NeverSample() 279 for _, rpc := range tc.rpcs { 280 mods := []tag.Mutator{} 281 for _, t := range rpc.tags { 282 mods = append(mods, tag.Upsert(t.k, t.v)) 283 } 284 ctx, err := tag.New(context.Background(), mods...) 285 if err != nil { 286 t.Errorf("%q: NewMap = %v", tc.label, err) 287 } 288 encoded := tag.Encode(tag.FromContext(ctx)) 289 ctx = stats.SetTags(context.Background(), encoded) 290 ctx = h.TagRPC(ctx, rpc.tagInfo) 291 292 for _, in := range rpc.inPayloads { 293 h.HandleRPC(ctx, in) 294 } 295 for _, out := range rpc.outPayloads { 296 h.HandleRPC(ctx, out) 297 } 298 h.HandleRPC(ctx, rpc.end) 299 } 300 301 for _, wantData := range tc.wants { 302 gotRows, err := view.RetrieveData(wantData.v().Name) 303 if err != nil { 304 t.Errorf("%q: RetrieveData (%q) = %v", tc.label, wantData.v().Name, err) 305 continue 306 } 307 308 for i := range gotRows { 309 view.ClearStart(gotRows[i].Data) 310 } 311 312 for _, gotRow := range gotRows { 313 if !containsRow(wantData.rows, gotRow) { 314 t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow) 315 break 316 } 317 } 318 319 for _, wantRow := range wantData.rows { 320 if !containsRow(gotRows, wantRow) { 321 t.Errorf("%q: missing row for view %q: %v", tc.label, wantData.v().Name, wantRow) 322 break 323 } 324 } 325 } 326 327 // Unregister views to cleanup. 328 view.Unregister(views...) 329 } 330} 331 332func newDistributionData(countPerBucket []int64, count int64, min, max, mean, sumOfSquaredDev float64) *view.DistributionData { 333 return &view.DistributionData{ 334 Count: count, 335 Min: min, 336 Max: max, 337 Mean: mean, 338 SumOfSquaredDev: sumOfSquaredDev, 339 CountPerBucket: countPerBucket, 340 } 341} 342 343func TestServerRecordExemplar(t *testing.T) { 344 key := tag.MustNewKey("test_key") 345 tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"} 346 out := &stats.OutPayload{Length: 2000} 347 end := &stats.End{Error: nil} 348 349 if err := view.Register(ServerSentBytesPerRPCView); err != nil { 350 t.Error(err) 351 } 352 h := &ServerHandler{} 353 h.StartOptions.Sampler = trace.AlwaysSample() 354 ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val")) 355 if err != nil { 356 t.Error(err) 357 } 358 encoded := tag.Encode(tag.FromContext(ctx)) 359 ctx = stats.SetTags(context.Background(), encoded) 360 ctx = h.TagRPC(ctx, tagInfo) 361 362 out.Client = false 363 h.HandleRPC(ctx, out) 364 end.Client = false 365 h.HandleRPC(ctx, end) 366 367 span := trace.FromContext(ctx) 368 if span == nil { 369 t.Fatal("expected non-nil span, got nil") 370 } 371 if !span.IsRecordingEvents() { 372 t.Errorf("span should be sampled") 373 } 374 attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()} 375 wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments} 376 377 rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name) 378 if err != nil { 379 t.Fatal("Error RetrieveData ", err) 380 } 381 if len(rows) == 0 { 382 t.Fatal("No data was recorded.") 383 } 384 data := rows[0].Data 385 dis, ok := data.(*view.DistributionData) 386 if !ok { 387 t.Fatal("want DistributionData, got ", data) 388 } 389 // Only recorded value is 2000, which falls into the second bucket (1024, 2048]. 390 wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 391 if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) { 392 t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket) 393 } 394 for i, e := range dis.ExemplarsPerBucket { 395 // Only the second bucket should have an exemplar. 396 if i == 1 { 397 if diff := cmpExemplar(e, wantExemplar); diff != "" { 398 t.Fatalf("Unexpected Exemplar -got +want: %s", diff) 399 } 400 } else if e != nil { 401 t.Errorf("want nil exemplar, got %v", e) 402 } 403 } 404 405 // Unregister views to cleanup. 406 view.Unregister(ServerSentBytesPerRPCView) 407} 408