1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package stats_test 20 21import ( 22 "context" 23 "fmt" 24 "io" 25 "net" 26 "reflect" 27 "sync" 28 "testing" 29 "time" 30 31 "github.com/golang/protobuf/proto" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/metadata" 34 "google.golang.org/grpc/stats" 35 testpb "google.golang.org/grpc/stats/grpc_testing" 36 "google.golang.org/grpc/status" 37) 38 39func init() { 40 grpc.EnableTracing = false 41} 42 43type connCtxKey struct{} 44type rpcCtxKey struct{} 45 46var ( 47 // For headers sent to server: 48 testMetadata = metadata.MD{ 49 "key1": []string{"value1"}, 50 "key2": []string{"value2"}, 51 } 52 // For headers sent from server: 53 testHeaderMetadata = metadata.MD{ 54 "hkey1": []string{"headerValue1"}, 55 "hkey2": []string{"headerValue2"}, 56 } 57 // For trailers sent from server: 58 testTrailerMetadata = metadata.MD{ 59 "tkey1": []string{"trailerValue1"}, 60 "tkey2": []string{"trailerValue2"}, 61 } 62 // The id for which the service handler should return error. 63 errorID int32 = 32202 64) 65 66type testServer struct { 67 testpb.UnimplementedTestServiceServer 68} 69 70func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 71 if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil { 72 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testHeaderMetadata, err) 73 } 74 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { 75 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err) 76 } 77 78 if in.Id == errorID { 79 return nil, fmt.Errorf("got error id: %v", in.Id) 80 } 81 82 return &testpb.SimpleResponse{Id: in.Id}, nil 83} 84 85func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 86 if err := stream.SendHeader(testHeaderMetadata); err != nil { 87 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) 88 } 89 stream.SetTrailer(testTrailerMetadata) 90 for { 91 in, err := stream.Recv() 92 if err == io.EOF { 93 // read done. 94 return nil 95 } 96 if err != nil { 97 return err 98 } 99 100 if in.Id == errorID { 101 return fmt.Errorf("got error id: %v", in.Id) 102 } 103 104 if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil { 105 return err 106 } 107 } 108} 109 110func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error { 111 if err := stream.SendHeader(testHeaderMetadata); err != nil { 112 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) 113 } 114 stream.SetTrailer(testTrailerMetadata) 115 for { 116 in, err := stream.Recv() 117 if err == io.EOF { 118 // read done. 119 return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)}) 120 } 121 if err != nil { 122 return err 123 } 124 125 if in.Id == errorID { 126 return fmt.Errorf("got error id: %v", in.Id) 127 } 128 } 129} 130 131func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error { 132 if err := stream.SendHeader(testHeaderMetadata); err != nil { 133 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) 134 } 135 stream.SetTrailer(testTrailerMetadata) 136 137 if in.Id == errorID { 138 return fmt.Errorf("got error id: %v", in.Id) 139 } 140 141 for i := 0; i < 5; i++ { 142 if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil { 143 return err 144 } 145 } 146 return nil 147} 148 149// test is an end-to-end test. It should be created with the newTest 150// func, modified as needed, and then started with its startServer method. 151// It should be cleaned up with the tearDown method. 152type test struct { 153 t *testing.T 154 compress string 155 clientStatsHandler stats.Handler 156 serverStatsHandler stats.Handler 157 158 testServer testpb.TestServiceServer // nil means none 159 // srv and srvAddr are set once startServer is called. 160 srv *grpc.Server 161 srvAddr string 162 163 cc *grpc.ClientConn // nil until requested via clientConn 164} 165 166func (te *test) tearDown() { 167 if te.cc != nil { 168 te.cc.Close() 169 te.cc = nil 170 } 171 te.srv.Stop() 172} 173 174type testConfig struct { 175 compress string 176} 177 178// newTest returns a new test using the provided testing.T and 179// environment. It is returned with default values. Tests should 180// modify it before calling its startServer and clientConn methods. 181func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test { 182 te := &test{ 183 t: t, 184 compress: tc.compress, 185 clientStatsHandler: ch, 186 serverStatsHandler: sh, 187 } 188 return te 189} 190 191// startServer starts a gRPC server listening. Callers should defer a 192// call to te.tearDown to clean up. 193func (te *test) startServer(ts testpb.TestServiceServer) { 194 te.testServer = ts 195 lis, err := net.Listen("tcp", "localhost:0") 196 if err != nil { 197 te.t.Fatalf("Failed to listen: %v", err) 198 } 199 var opts []grpc.ServerOption 200 if te.compress == "gzip" { 201 opts = append(opts, 202 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 203 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 204 ) 205 } 206 if te.serverStatsHandler != nil { 207 opts = append(opts, grpc.StatsHandler(te.serverStatsHandler)) 208 } 209 s := grpc.NewServer(opts...) 210 te.srv = s 211 if te.testServer != nil { 212 testpb.RegisterTestServiceServer(s, te.testServer) 213 } 214 215 go s.Serve(lis) 216 te.srvAddr = lis.Addr().String() 217} 218 219func (te *test) clientConn() *grpc.ClientConn { 220 if te.cc != nil { 221 return te.cc 222 } 223 opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} 224 if te.compress == "gzip" { 225 opts = append(opts, 226 grpc.WithCompressor(grpc.NewGZIPCompressor()), 227 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 228 ) 229 } 230 if te.clientStatsHandler != nil { 231 opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler)) 232 } 233 234 var err error 235 te.cc, err = grpc.Dial(te.srvAddr, opts...) 236 if err != nil { 237 te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err) 238 } 239 return te.cc 240} 241 242type rpcType int 243 244const ( 245 unaryRPC rpcType = iota 246 clientStreamRPC 247 serverStreamRPC 248 fullDuplexStreamRPC 249) 250 251type rpcConfig struct { 252 count int // Number of requests and responses for streaming RPCs. 253 success bool // Whether the RPC should succeed or return error. 254 failfast bool 255 callType rpcType // Type of RPC. 256} 257 258func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) { 259 var ( 260 resp *testpb.SimpleResponse 261 req *testpb.SimpleRequest 262 err error 263 ) 264 tc := testpb.NewTestServiceClient(te.clientConn()) 265 if c.success { 266 req = &testpb.SimpleRequest{Id: errorID + 1} 267 } else { 268 req = &testpb.SimpleRequest{Id: errorID} 269 } 270 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 271 resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast)) 272 return req, resp, err 273} 274 275func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) { 276 var ( 277 reqs []*testpb.SimpleRequest 278 resps []*testpb.SimpleResponse 279 err error 280 ) 281 tc := testpb.NewTestServiceClient(te.clientConn()) 282 stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) 283 if err != nil { 284 return reqs, resps, err 285 } 286 var startID int32 287 if !c.success { 288 startID = errorID 289 } 290 for i := 0; i < c.count; i++ { 291 req := &testpb.SimpleRequest{ 292 Id: int32(i) + startID, 293 } 294 reqs = append(reqs, req) 295 if err = stream.Send(req); err != nil { 296 return reqs, resps, err 297 } 298 var resp *testpb.SimpleResponse 299 if resp, err = stream.Recv(); err != nil { 300 return reqs, resps, err 301 } 302 resps = append(resps, resp) 303 } 304 if err = stream.CloseSend(); err != nil && err != io.EOF { 305 return reqs, resps, err 306 } 307 if _, err = stream.Recv(); err != io.EOF { 308 return reqs, resps, err 309 } 310 311 return reqs, resps, nil 312} 313 314func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) { 315 var ( 316 reqs []*testpb.SimpleRequest 317 resp *testpb.SimpleResponse 318 err error 319 ) 320 tc := testpb.NewTestServiceClient(te.clientConn()) 321 stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) 322 if err != nil { 323 return reqs, resp, err 324 } 325 var startID int32 326 if !c.success { 327 startID = errorID 328 } 329 for i := 0; i < c.count; i++ { 330 req := &testpb.SimpleRequest{ 331 Id: int32(i) + startID, 332 } 333 reqs = append(reqs, req) 334 if err = stream.Send(req); err != nil { 335 return reqs, resp, err 336 } 337 } 338 resp, err = stream.CloseAndRecv() 339 return reqs, resp, err 340} 341 342func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) { 343 var ( 344 req *testpb.SimpleRequest 345 resps []*testpb.SimpleResponse 346 err error 347 ) 348 349 tc := testpb.NewTestServiceClient(te.clientConn()) 350 351 var startID int32 352 if !c.success { 353 startID = errorID 354 } 355 req = &testpb.SimpleRequest{Id: startID} 356 stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast)) 357 if err != nil { 358 return req, resps, err 359 } 360 for { 361 var resp *testpb.SimpleResponse 362 resp, err := stream.Recv() 363 if err == io.EOF { 364 return req, resps, nil 365 } else if err != nil { 366 return req, resps, err 367 } 368 resps = append(resps, resp) 369 } 370} 371 372type expectedData struct { 373 method string 374 serverAddr string 375 compression string 376 reqIdx int 377 requests []*testpb.SimpleRequest 378 respIdx int 379 responses []*testpb.SimpleResponse 380 err error 381 failfast bool 382} 383 384type gotData struct { 385 ctx context.Context 386 client bool 387 s interface{} // This could be RPCStats or ConnStats. 388} 389 390const ( 391 begin int = iota 392 end 393 inPayload 394 inHeader 395 inTrailer 396 outPayload 397 outHeader 398 // TODO: test outTrailer ? 399 connBegin 400 connEnd 401) 402 403func checkBegin(t *testing.T, d *gotData, e *expectedData) { 404 var ( 405 ok bool 406 st *stats.Begin 407 ) 408 if st, ok = d.s.(*stats.Begin); !ok { 409 t.Fatalf("got %T, want Begin", d.s) 410 } 411 if d.ctx == nil { 412 t.Fatalf("d.ctx = nil, want <non-nil>") 413 } 414 if st.BeginTime.IsZero() { 415 t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime) 416 } 417 if d.client { 418 if st.FailFast != e.failfast { 419 t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast) 420 } 421 } 422} 423 424func checkInHeader(t *testing.T, d *gotData, e *expectedData) { 425 var ( 426 ok bool 427 st *stats.InHeader 428 ) 429 if st, ok = d.s.(*stats.InHeader); !ok { 430 t.Fatalf("got %T, want InHeader", d.s) 431 } 432 if d.ctx == nil { 433 t.Fatalf("d.ctx = nil, want <non-nil>") 434 } 435 if d.client { 436 // additional headers might be injected so instead of testing equality, test that all the 437 // expected headers keys have the expected header values. 438 for key := range testHeaderMetadata { 439 if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) { 440 t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key)) 441 } 442 } 443 } else { 444 if st.FullMethod != e.method { 445 t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method) 446 } 447 if st.LocalAddr.String() != e.serverAddr { 448 t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr) 449 } 450 if st.Compression != e.compression { 451 t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression) 452 } 453 // additional headers might be injected so instead of testing equality, test that all the 454 // expected headers keys have the expected header values. 455 for key := range testMetadata { 456 if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) { 457 t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key)) 458 } 459 } 460 461 if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok { 462 if connInfo.RemoteAddr != st.RemoteAddr { 463 t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr) 464 } 465 if connInfo.LocalAddr != st.LocalAddr { 466 t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr) 467 } 468 } else { 469 t.Fatalf("got context %v, want one with connCtxKey", d.ctx) 470 } 471 if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok { 472 if rpcInfo.FullMethodName != st.FullMethod { 473 t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod) 474 } 475 } else { 476 t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx) 477 } 478 } 479} 480 481func checkInPayload(t *testing.T, d *gotData, e *expectedData) { 482 var ( 483 ok bool 484 st *stats.InPayload 485 ) 486 if st, ok = d.s.(*stats.InPayload); !ok { 487 t.Fatalf("got %T, want InPayload", d.s) 488 } 489 if d.ctx == nil { 490 t.Fatalf("d.ctx = nil, want <non-nil>") 491 } 492 if d.client { 493 b, err := proto.Marshal(e.responses[e.respIdx]) 494 if err != nil { 495 t.Fatalf("failed to marshal message: %v", err) 496 } 497 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) { 498 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx]) 499 } 500 e.respIdx++ 501 if string(st.Data) != string(b) { 502 t.Fatalf("st.Data = %v, want %v", st.Data, b) 503 } 504 if st.Length != len(b) { 505 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b)) 506 } 507 } else { 508 b, err := proto.Marshal(e.requests[e.reqIdx]) 509 if err != nil { 510 t.Fatalf("failed to marshal message: %v", err) 511 } 512 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) { 513 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx]) 514 } 515 e.reqIdx++ 516 if string(st.Data) != string(b) { 517 t.Fatalf("st.Data = %v, want %v", st.Data, b) 518 } 519 if st.Length != len(b) { 520 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b)) 521 } 522 } 523 // Below are sanity checks that WireLength and RecvTime are populated. 524 // TODO: check values of WireLength and RecvTime. 525 if len(st.Data) > 0 && st.WireLength == 0 { 526 t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>", 527 st.WireLength) 528 } 529 if st.RecvTime.IsZero() { 530 t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime) 531 } 532} 533 534func checkInTrailer(t *testing.T, d *gotData, e *expectedData) { 535 var ( 536 ok bool 537 st *stats.InTrailer 538 ) 539 if st, ok = d.s.(*stats.InTrailer); !ok { 540 t.Fatalf("got %T, want InTrailer", d.s) 541 } 542 if d.ctx == nil { 543 t.Fatalf("d.ctx = nil, want <non-nil>") 544 } 545 if !st.Client { 546 t.Fatalf("st IsClient = false, want true") 547 } 548 if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) { 549 t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata) 550 } 551} 552 553func checkOutHeader(t *testing.T, d *gotData, e *expectedData) { 554 var ( 555 ok bool 556 st *stats.OutHeader 557 ) 558 if st, ok = d.s.(*stats.OutHeader); !ok { 559 t.Fatalf("got %T, want OutHeader", d.s) 560 } 561 if d.ctx == nil { 562 t.Fatalf("d.ctx = nil, want <non-nil>") 563 } 564 if d.client { 565 if st.FullMethod != e.method { 566 t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method) 567 } 568 if st.RemoteAddr.String() != e.serverAddr { 569 t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr) 570 } 571 if st.Compression != e.compression { 572 t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression) 573 } 574 // additional headers might be injected so instead of testing equality, test that all the 575 // expected headers keys have the expected header values. 576 for key := range testMetadata { 577 if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) { 578 t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key)) 579 } 580 } 581 582 if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok { 583 if rpcInfo.FullMethodName != st.FullMethod { 584 t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod) 585 } 586 } else { 587 t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx) 588 } 589 } else { 590 // additional headers might be injected so instead of testing equality, test that all the 591 // expected headers keys have the expected header values. 592 for key := range testHeaderMetadata { 593 if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) { 594 t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key)) 595 } 596 } 597 } 598} 599 600func checkOutPayload(t *testing.T, d *gotData, e *expectedData) { 601 var ( 602 ok bool 603 st *stats.OutPayload 604 ) 605 if st, ok = d.s.(*stats.OutPayload); !ok { 606 t.Fatalf("got %T, want OutPayload", d.s) 607 } 608 if d.ctx == nil { 609 t.Fatalf("d.ctx = nil, want <non-nil>") 610 } 611 if d.client { 612 b, err := proto.Marshal(e.requests[e.reqIdx]) 613 if err != nil { 614 t.Fatalf("failed to marshal message: %v", err) 615 } 616 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) { 617 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx]) 618 } 619 e.reqIdx++ 620 if string(st.Data) != string(b) { 621 t.Fatalf("st.Data = %v, want %v", st.Data, b) 622 } 623 if st.Length != len(b) { 624 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b)) 625 } 626 } else { 627 b, err := proto.Marshal(e.responses[e.respIdx]) 628 if err != nil { 629 t.Fatalf("failed to marshal message: %v", err) 630 } 631 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) { 632 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx]) 633 } 634 e.respIdx++ 635 if string(st.Data) != string(b) { 636 t.Fatalf("st.Data = %v, want %v", st.Data, b) 637 } 638 if st.Length != len(b) { 639 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b)) 640 } 641 } 642 // Below are sanity checks that WireLength and SentTime are populated. 643 // TODO: check values of WireLength and SentTime. 644 if len(st.Data) > 0 && st.WireLength == 0 { 645 t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>", 646 st.WireLength) 647 } 648 if st.SentTime.IsZero() { 649 t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime) 650 } 651} 652 653func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) { 654 var ( 655 ok bool 656 st *stats.OutTrailer 657 ) 658 if st, ok = d.s.(*stats.OutTrailer); !ok { 659 t.Fatalf("got %T, want OutTrailer", d.s) 660 } 661 if d.ctx == nil { 662 t.Fatalf("d.ctx = nil, want <non-nil>") 663 } 664 if st.Client { 665 t.Fatalf("st IsClient = true, want false") 666 } 667 if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) { 668 t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata) 669 } 670} 671 672func checkEnd(t *testing.T, d *gotData, e *expectedData) { 673 var ( 674 ok bool 675 st *stats.End 676 ) 677 if st, ok = d.s.(*stats.End); !ok { 678 t.Fatalf("got %T, want End", d.s) 679 } 680 if d.ctx == nil { 681 t.Fatalf("d.ctx = nil, want <non-nil>") 682 } 683 if st.BeginTime.IsZero() { 684 t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime) 685 } 686 if st.EndTime.IsZero() { 687 t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime) 688 } 689 690 actual, ok := status.FromError(st.Error) 691 if !ok { 692 t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error) 693 } 694 695 expectedStatus, _ := status.FromError(e.err) 696 if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() { 697 t.Fatalf("st.Error = %v, want %v", st.Error, e.err) 698 } 699 700 if st.Client { 701 if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) { 702 t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata) 703 } 704 } else { 705 if st.Trailer != nil { 706 t.Fatalf("st.Trailer = %v, want nil", st.Trailer) 707 } 708 } 709} 710 711func checkConnBegin(t *testing.T, d *gotData, e *expectedData) { 712 var ( 713 ok bool 714 st *stats.ConnBegin 715 ) 716 if st, ok = d.s.(*stats.ConnBegin); !ok { 717 t.Fatalf("got %T, want ConnBegin", d.s) 718 } 719 if d.ctx == nil { 720 t.Fatalf("d.ctx = nil, want <non-nil>") 721 } 722 st.IsClient() // TODO remove this. 723} 724 725func checkConnEnd(t *testing.T, d *gotData, e *expectedData) { 726 var ( 727 ok bool 728 st *stats.ConnEnd 729 ) 730 if st, ok = d.s.(*stats.ConnEnd); !ok { 731 t.Fatalf("got %T, want ConnEnd", d.s) 732 } 733 if d.ctx == nil { 734 t.Fatalf("d.ctx = nil, want <non-nil>") 735 } 736 st.IsClient() // TODO remove this. 737} 738 739type statshandler struct { 740 mu sync.Mutex 741 gotRPC []*gotData 742 gotConn []*gotData 743} 744 745func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { 746 return context.WithValue(ctx, connCtxKey{}, info) 747} 748 749func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { 750 return context.WithValue(ctx, rpcCtxKey{}, info) 751} 752 753func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) { 754 h.mu.Lock() 755 defer h.mu.Unlock() 756 h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s}) 757} 758 759func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) { 760 h.mu.Lock() 761 defer h.mu.Unlock() 762 h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s}) 763} 764 765func checkConnStats(t *testing.T, got []*gotData) { 766 if len(got) <= 0 || len(got)%2 != 0 { 767 for i, g := range got { 768 t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx) 769 } 770 t.Fatalf("got %v stats, want even positive number", len(got)) 771 } 772 // The first conn stats must be a ConnBegin. 773 checkConnBegin(t, got[0], nil) 774 // The last conn stats must be a ConnEnd. 775 checkConnEnd(t, got[len(got)-1], nil) 776} 777 778func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { 779 if len(got) != len(checkFuncs) { 780 for i, g := range got { 781 t.Errorf(" - %v, %T", i, g.s) 782 } 783 t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs)) 784 } 785 786 var rpcctx context.Context 787 for i := 0; i < len(got); i++ { 788 if _, ok := got[i].s.(stats.RPCStats); ok { 789 if rpcctx != nil && got[i].ctx != rpcctx { 790 t.Fatalf("got different contexts with stats %T", got[i].s) 791 } 792 rpcctx = got[i].ctx 793 } 794 } 795 796 for i, f := range checkFuncs { 797 f(t, got[i], expect) 798 } 799} 800 801func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { 802 h := &statshandler{} 803 te := newTest(t, tc, nil, h) 804 te.startServer(&testServer{}) 805 defer te.tearDown() 806 807 var ( 808 reqs []*testpb.SimpleRequest 809 resps []*testpb.SimpleResponse 810 err error 811 method string 812 813 req *testpb.SimpleRequest 814 resp *testpb.SimpleResponse 815 e error 816 ) 817 818 switch cc.callType { 819 case unaryRPC: 820 method = "/grpc.testing.TestService/UnaryCall" 821 req, resp, e = te.doUnaryCall(cc) 822 reqs = []*testpb.SimpleRequest{req} 823 resps = []*testpb.SimpleResponse{resp} 824 err = e 825 case clientStreamRPC: 826 method = "/grpc.testing.TestService/ClientStreamCall" 827 reqs, resp, e = te.doClientStreamCall(cc) 828 resps = []*testpb.SimpleResponse{resp} 829 err = e 830 case serverStreamRPC: 831 method = "/grpc.testing.TestService/ServerStreamCall" 832 req, resps, e = te.doServerStreamCall(cc) 833 reqs = []*testpb.SimpleRequest{req} 834 err = e 835 case fullDuplexStreamRPC: 836 method = "/grpc.testing.TestService/FullDuplexCall" 837 reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) 838 } 839 if cc.success != (err == nil) { 840 t.Fatalf("cc.success: %v, got error: %v", cc.success, err) 841 } 842 te.cc.Close() 843 te.srv.GracefulStop() // Wait for the server to stop. 844 845 for { 846 h.mu.Lock() 847 if len(h.gotRPC) >= len(checkFuncs) { 848 h.mu.Unlock() 849 break 850 } 851 h.mu.Unlock() 852 time.Sleep(10 * time.Millisecond) 853 } 854 855 for { 856 h.mu.Lock() 857 if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok { 858 h.mu.Unlock() 859 break 860 } 861 h.mu.Unlock() 862 time.Sleep(10 * time.Millisecond) 863 } 864 865 expect := &expectedData{ 866 serverAddr: te.srvAddr, 867 compression: tc.compress, 868 method: method, 869 requests: reqs, 870 responses: resps, 871 err: err, 872 } 873 874 h.mu.Lock() 875 checkConnStats(t, h.gotConn) 876 h.mu.Unlock() 877 checkServerStats(t, h.gotRPC, expect, checkFuncs) 878} 879 880func TestServerStatsUnaryRPC(t *testing.T) { 881 testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ 882 checkInHeader, 883 checkBegin, 884 checkInPayload, 885 checkOutHeader, 886 checkOutPayload, 887 checkOutTrailer, 888 checkEnd, 889 }) 890} 891 892func TestServerStatsUnaryRPCError(t *testing.T) { 893 testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ 894 checkInHeader, 895 checkBegin, 896 checkInPayload, 897 checkOutHeader, 898 checkOutTrailer, 899 checkEnd, 900 }) 901} 902 903func TestServerStatsClientStreamRPC(t *testing.T) { 904 count := 5 905 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 906 checkInHeader, 907 checkBegin, 908 checkOutHeader, 909 } 910 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 911 checkInPayload, 912 } 913 for i := 0; i < count; i++ { 914 checkFuncs = append(checkFuncs, ioPayFuncs...) 915 } 916 checkFuncs = append(checkFuncs, 917 checkOutPayload, 918 checkOutTrailer, 919 checkEnd, 920 ) 921 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs) 922} 923 924func TestServerStatsClientStreamRPCError(t *testing.T) { 925 count := 1 926 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ 927 checkInHeader, 928 checkBegin, 929 checkOutHeader, 930 checkInPayload, 931 checkOutTrailer, 932 checkEnd, 933 }) 934} 935 936func TestServerStatsServerStreamRPC(t *testing.T) { 937 count := 5 938 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 939 checkInHeader, 940 checkBegin, 941 checkInPayload, 942 checkOutHeader, 943 } 944 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 945 checkOutPayload, 946 } 947 for i := 0; i < count; i++ { 948 checkFuncs = append(checkFuncs, ioPayFuncs...) 949 } 950 checkFuncs = append(checkFuncs, 951 checkOutTrailer, 952 checkEnd, 953 ) 954 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs) 955} 956 957func TestServerStatsServerStreamRPCError(t *testing.T) { 958 count := 5 959 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ 960 checkInHeader, 961 checkBegin, 962 checkInPayload, 963 checkOutHeader, 964 checkOutTrailer, 965 checkEnd, 966 }) 967} 968 969func TestServerStatsFullDuplexRPC(t *testing.T) { 970 count := 5 971 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 972 checkInHeader, 973 checkBegin, 974 checkOutHeader, 975 } 976 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ 977 checkInPayload, 978 checkOutPayload, 979 } 980 for i := 0; i < count; i++ { 981 checkFuncs = append(checkFuncs, ioPayFuncs...) 982 } 983 checkFuncs = append(checkFuncs, 984 checkOutTrailer, 985 checkEnd, 986 ) 987 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs) 988} 989 990func TestServerStatsFullDuplexRPCError(t *testing.T) { 991 count := 5 992 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ 993 checkInHeader, 994 checkBegin, 995 checkOutHeader, 996 checkInPayload, 997 checkOutTrailer, 998 checkEnd, 999 }) 1000} 1001 1002type checkFuncWithCount struct { 1003 f func(t *testing.T, d *gotData, e *expectedData) 1004 c int // expected count 1005} 1006 1007func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) { 1008 var expectLen int 1009 for _, v := range checkFuncs { 1010 expectLen += v.c 1011 } 1012 if len(got) != expectLen { 1013 for i, g := range got { 1014 t.Errorf(" - %v, %T", i, g.s) 1015 } 1016 t.Fatalf("got %v stats, want %v stats", len(got), expectLen) 1017 } 1018 1019 var tagInfoInCtx *stats.RPCTagInfo 1020 for i := 0; i < len(got); i++ { 1021 if _, ok := got[i].s.(stats.RPCStats); ok { 1022 tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo) 1023 if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew { 1024 t.Fatalf("got context containing different tagInfo with stats %T", got[i].s) 1025 } 1026 tagInfoInCtx = tagInfoInCtxNew 1027 } 1028 } 1029 1030 for _, s := range got { 1031 switch s.s.(type) { 1032 case *stats.Begin: 1033 if checkFuncs[begin].c <= 0 { 1034 t.Fatalf("unexpected stats: %T", s.s) 1035 } 1036 checkFuncs[begin].f(t, s, expect) 1037 checkFuncs[begin].c-- 1038 case *stats.OutHeader: 1039 if checkFuncs[outHeader].c <= 0 { 1040 t.Fatalf("unexpected stats: %T", s.s) 1041 } 1042 checkFuncs[outHeader].f(t, s, expect) 1043 checkFuncs[outHeader].c-- 1044 case *stats.OutPayload: 1045 if checkFuncs[outPayload].c <= 0 { 1046 t.Fatalf("unexpected stats: %T", s.s) 1047 } 1048 checkFuncs[outPayload].f(t, s, expect) 1049 checkFuncs[outPayload].c-- 1050 case *stats.InHeader: 1051 if checkFuncs[inHeader].c <= 0 { 1052 t.Fatalf("unexpected stats: %T", s.s) 1053 } 1054 checkFuncs[inHeader].f(t, s, expect) 1055 checkFuncs[inHeader].c-- 1056 case *stats.InPayload: 1057 if checkFuncs[inPayload].c <= 0 { 1058 t.Fatalf("unexpected stats: %T", s.s) 1059 } 1060 checkFuncs[inPayload].f(t, s, expect) 1061 checkFuncs[inPayload].c-- 1062 case *stats.InTrailer: 1063 if checkFuncs[inTrailer].c <= 0 { 1064 t.Fatalf("unexpected stats: %T", s.s) 1065 } 1066 checkFuncs[inTrailer].f(t, s, expect) 1067 checkFuncs[inTrailer].c-- 1068 case *stats.End: 1069 if checkFuncs[end].c <= 0 { 1070 t.Fatalf("unexpected stats: %T", s.s) 1071 } 1072 checkFuncs[end].f(t, s, expect) 1073 checkFuncs[end].c-- 1074 case *stats.ConnBegin: 1075 if checkFuncs[connBegin].c <= 0 { 1076 t.Fatalf("unexpected stats: %T", s.s) 1077 } 1078 checkFuncs[connBegin].f(t, s, expect) 1079 checkFuncs[connBegin].c-- 1080 case *stats.ConnEnd: 1081 if checkFuncs[connEnd].c <= 0 { 1082 t.Fatalf("unexpected stats: %T", s.s) 1083 } 1084 checkFuncs[connEnd].f(t, s, expect) 1085 checkFuncs[connEnd].c-- 1086 default: 1087 t.Fatalf("unexpected stats: %T", s.s) 1088 } 1089 } 1090} 1091 1092func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) { 1093 h := &statshandler{} 1094 te := newTest(t, tc, h, nil) 1095 te.startServer(&testServer{}) 1096 defer te.tearDown() 1097 1098 var ( 1099 reqs []*testpb.SimpleRequest 1100 resps []*testpb.SimpleResponse 1101 method string 1102 err error 1103 1104 req *testpb.SimpleRequest 1105 resp *testpb.SimpleResponse 1106 e error 1107 ) 1108 switch cc.callType { 1109 case unaryRPC: 1110 method = "/grpc.testing.TestService/UnaryCall" 1111 req, resp, e = te.doUnaryCall(cc) 1112 reqs = []*testpb.SimpleRequest{req} 1113 resps = []*testpb.SimpleResponse{resp} 1114 err = e 1115 case clientStreamRPC: 1116 method = "/grpc.testing.TestService/ClientStreamCall" 1117 reqs, resp, e = te.doClientStreamCall(cc) 1118 resps = []*testpb.SimpleResponse{resp} 1119 err = e 1120 case serverStreamRPC: 1121 method = "/grpc.testing.TestService/ServerStreamCall" 1122 req, resps, e = te.doServerStreamCall(cc) 1123 reqs = []*testpb.SimpleRequest{req} 1124 err = e 1125 case fullDuplexStreamRPC: 1126 method = "/grpc.testing.TestService/FullDuplexCall" 1127 reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) 1128 } 1129 if cc.success != (err == nil) { 1130 t.Fatalf("cc.success: %v, got error: %v", cc.success, err) 1131 } 1132 te.cc.Close() 1133 te.srv.GracefulStop() // Wait for the server to stop. 1134 1135 lenRPCStats := 0 1136 for _, v := range checkFuncs { 1137 lenRPCStats += v.c 1138 } 1139 for { 1140 h.mu.Lock() 1141 if len(h.gotRPC) >= lenRPCStats { 1142 h.mu.Unlock() 1143 break 1144 } 1145 h.mu.Unlock() 1146 time.Sleep(10 * time.Millisecond) 1147 } 1148 1149 for { 1150 h.mu.Lock() 1151 if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok { 1152 h.mu.Unlock() 1153 break 1154 } 1155 h.mu.Unlock() 1156 time.Sleep(10 * time.Millisecond) 1157 } 1158 1159 expect := &expectedData{ 1160 serverAddr: te.srvAddr, 1161 compression: tc.compress, 1162 method: method, 1163 requests: reqs, 1164 responses: resps, 1165 failfast: cc.failfast, 1166 err: err, 1167 } 1168 1169 h.mu.Lock() 1170 checkConnStats(t, h.gotConn) 1171 h.mu.Unlock() 1172 checkClientStats(t, h.gotRPC, expect, checkFuncs) 1173} 1174 1175func TestClientStatsUnaryRPC(t *testing.T) { 1176 testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ 1177 begin: {checkBegin, 1}, 1178 outHeader: {checkOutHeader, 1}, 1179 outPayload: {checkOutPayload, 1}, 1180 inHeader: {checkInHeader, 1}, 1181 inPayload: {checkInPayload, 1}, 1182 inTrailer: {checkInTrailer, 1}, 1183 end: {checkEnd, 1}, 1184 }) 1185} 1186 1187func TestClientStatsUnaryRPCError(t *testing.T) { 1188 testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ 1189 begin: {checkBegin, 1}, 1190 outHeader: {checkOutHeader, 1}, 1191 outPayload: {checkOutPayload, 1}, 1192 inHeader: {checkInHeader, 1}, 1193 inTrailer: {checkInTrailer, 1}, 1194 end: {checkEnd, 1}, 1195 }) 1196} 1197 1198func TestClientStatsClientStreamRPC(t *testing.T) { 1199 count := 5 1200 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ 1201 begin: {checkBegin, 1}, 1202 outHeader: {checkOutHeader, 1}, 1203 inHeader: {checkInHeader, 1}, 1204 outPayload: {checkOutPayload, count}, 1205 inTrailer: {checkInTrailer, 1}, 1206 inPayload: {checkInPayload, 1}, 1207 end: {checkEnd, 1}, 1208 }) 1209} 1210 1211func TestClientStatsClientStreamRPCError(t *testing.T) { 1212 count := 1 1213 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ 1214 begin: {checkBegin, 1}, 1215 outHeader: {checkOutHeader, 1}, 1216 inHeader: {checkInHeader, 1}, 1217 outPayload: {checkOutPayload, 1}, 1218 inTrailer: {checkInTrailer, 1}, 1219 end: {checkEnd, 1}, 1220 }) 1221} 1222 1223func TestClientStatsServerStreamRPC(t *testing.T) { 1224 count := 5 1225 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ 1226 begin: {checkBegin, 1}, 1227 outHeader: {checkOutHeader, 1}, 1228 outPayload: {checkOutPayload, 1}, 1229 inHeader: {checkInHeader, 1}, 1230 inPayload: {checkInPayload, count}, 1231 inTrailer: {checkInTrailer, 1}, 1232 end: {checkEnd, 1}, 1233 }) 1234} 1235 1236func TestClientStatsServerStreamRPCError(t *testing.T) { 1237 count := 5 1238 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ 1239 begin: {checkBegin, 1}, 1240 outHeader: {checkOutHeader, 1}, 1241 outPayload: {checkOutPayload, 1}, 1242 inHeader: {checkInHeader, 1}, 1243 inTrailer: {checkInTrailer, 1}, 1244 end: {checkEnd, 1}, 1245 }) 1246} 1247 1248func TestClientStatsFullDuplexRPC(t *testing.T) { 1249 count := 5 1250 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ 1251 begin: {checkBegin, 1}, 1252 outHeader: {checkOutHeader, 1}, 1253 outPayload: {checkOutPayload, count}, 1254 inHeader: {checkInHeader, 1}, 1255 inPayload: {checkInPayload, count}, 1256 inTrailer: {checkInTrailer, 1}, 1257 end: {checkEnd, 1}, 1258 }) 1259} 1260 1261func TestClientStatsFullDuplexRPCError(t *testing.T) { 1262 count := 5 1263 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ 1264 begin: {checkBegin, 1}, 1265 outHeader: {checkOutHeader, 1}, 1266 outPayload: {checkOutPayload, 1}, 1267 inHeader: {checkInHeader, 1}, 1268 inTrailer: {checkInTrailer, 1}, 1269 end: {checkEnd, 1}, 1270 }) 1271} 1272 1273func TestTags(t *testing.T) { 1274 b := []byte{5, 2, 4, 3, 1} 1275 ctx := stats.SetTags(context.Background(), b) 1276 if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) { 1277 t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b) 1278 } 1279 if tg := stats.Tags(ctx); tg != nil { 1280 t.Errorf("Tags(%v) = %v; want nil", ctx, tg) 1281 } 1282 1283 ctx = stats.SetIncomingTags(context.Background(), b) 1284 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) { 1285 t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b) 1286 } 1287 if tg := stats.OutgoingTags(ctx); tg != nil { 1288 t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg) 1289 } 1290} 1291 1292func TestTrace(t *testing.T) { 1293 b := []byte{5, 2, 4, 3, 1} 1294 ctx := stats.SetTrace(context.Background(), b) 1295 if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) { 1296 t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b) 1297 } 1298 if tr := stats.Trace(ctx); tr != nil { 1299 t.Errorf("Trace(%v) = %v; want nil", ctx, tr) 1300 } 1301 1302 ctx = stats.SetIncomingTrace(context.Background(), b) 1303 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) { 1304 t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b) 1305 } 1306 if tr := stats.OutgoingTrace(ctx); tr != nil { 1307 t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr) 1308 } 1309} 1310