1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package stats_test
20
21import (
22	"context"
23	"fmt"
24	"io"
25	"net"
26	"reflect"
27	"sync"
28	"testing"
29	"time"
30
31	"github.com/golang/protobuf/proto"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/metadata"
34	"google.golang.org/grpc/stats"
35	testpb "google.golang.org/grpc/stats/grpc_testing"
36	"google.golang.org/grpc/status"
37)
38
39func init() {
40	grpc.EnableTracing = false
41}
42
43type connCtxKey struct{}
44type rpcCtxKey struct{}
45
46var (
47	// For headers sent to server:
48	testMetadata = metadata.MD{
49		"key1": []string{"value1"},
50		"key2": []string{"value2"},
51	}
52	// For headers sent from server:
53	testHeaderMetadata = metadata.MD{
54		"hkey1": []string{"headerValue1"},
55		"hkey2": []string{"headerValue2"},
56	}
57	// For trailers sent from server:
58	testTrailerMetadata = metadata.MD{
59		"tkey1": []string{"trailerValue1"},
60		"tkey2": []string{"trailerValue2"},
61	}
62	// The id for which the service handler should return error.
63	errorID int32 = 32202
64)
65
66type testServer struct {
67	testpb.UnimplementedTestServiceServer
68}
69
70func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
71	if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil {
72		return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testHeaderMetadata, err)
73	}
74	if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
75		return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
76	}
77
78	if in.Id == errorID {
79		return nil, fmt.Errorf("got error id: %v", in.Id)
80	}
81
82	return &testpb.SimpleResponse{Id: in.Id}, nil
83}
84
85func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
86	if err := stream.SendHeader(testHeaderMetadata); err != nil {
87		return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
88	}
89	stream.SetTrailer(testTrailerMetadata)
90	for {
91		in, err := stream.Recv()
92		if err == io.EOF {
93			// read done.
94			return nil
95		}
96		if err != nil {
97			return err
98		}
99
100		if in.Id == errorID {
101			return fmt.Errorf("got error id: %v", in.Id)
102		}
103
104		if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
105			return err
106		}
107	}
108}
109
110func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
111	if err := stream.SendHeader(testHeaderMetadata); err != nil {
112		return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
113	}
114	stream.SetTrailer(testTrailerMetadata)
115	for {
116		in, err := stream.Recv()
117		if err == io.EOF {
118			// read done.
119			return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
120		}
121		if err != nil {
122			return err
123		}
124
125		if in.Id == errorID {
126			return fmt.Errorf("got error id: %v", in.Id)
127		}
128	}
129}
130
131func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
132	if err := stream.SendHeader(testHeaderMetadata); err != nil {
133		return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil)
134	}
135	stream.SetTrailer(testTrailerMetadata)
136
137	if in.Id == errorID {
138		return fmt.Errorf("got error id: %v", in.Id)
139	}
140
141	for i := 0; i < 5; i++ {
142		if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
143			return err
144		}
145	}
146	return nil
147}
148
149// test is an end-to-end test. It should be created with the newTest
150// func, modified as needed, and then started with its startServer method.
151// It should be cleaned up with the tearDown method.
152type test struct {
153	t                  *testing.T
154	compress           string
155	clientStatsHandler stats.Handler
156	serverStatsHandler stats.Handler
157
158	testServer testpb.TestServiceServer // nil means none
159	// srv and srvAddr are set once startServer is called.
160	srv     *grpc.Server
161	srvAddr string
162
163	cc *grpc.ClientConn // nil until requested via clientConn
164}
165
166func (te *test) tearDown() {
167	if te.cc != nil {
168		te.cc.Close()
169		te.cc = nil
170	}
171	te.srv.Stop()
172}
173
174type testConfig struct {
175	compress string
176}
177
178// newTest returns a new test using the provided testing.T and
179// environment.  It is returned with default values. Tests should
180// modify it before calling its startServer and clientConn methods.
181func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test {
182	te := &test{
183		t:                  t,
184		compress:           tc.compress,
185		clientStatsHandler: ch,
186		serverStatsHandler: sh,
187	}
188	return te
189}
190
191// startServer starts a gRPC server listening. Callers should defer a
192// call to te.tearDown to clean up.
193func (te *test) startServer(ts testpb.TestServiceServer) {
194	te.testServer = ts
195	lis, err := net.Listen("tcp", "localhost:0")
196	if err != nil {
197		te.t.Fatalf("Failed to listen: %v", err)
198	}
199	var opts []grpc.ServerOption
200	if te.compress == "gzip" {
201		opts = append(opts,
202			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
203			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
204		)
205	}
206	if te.serverStatsHandler != nil {
207		opts = append(opts, grpc.StatsHandler(te.serverStatsHandler))
208	}
209	s := grpc.NewServer(opts...)
210	te.srv = s
211	if te.testServer != nil {
212		testpb.RegisterTestServiceServer(s, te.testServer)
213	}
214
215	go s.Serve(lis)
216	te.srvAddr = lis.Addr().String()
217}
218
219func (te *test) clientConn() *grpc.ClientConn {
220	if te.cc != nil {
221		return te.cc
222	}
223	opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
224	if te.compress == "gzip" {
225		opts = append(opts,
226			grpc.WithCompressor(grpc.NewGZIPCompressor()),
227			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
228		)
229	}
230	if te.clientStatsHandler != nil {
231		opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler))
232	}
233
234	var err error
235	te.cc, err = grpc.Dial(te.srvAddr, opts...)
236	if err != nil {
237		te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
238	}
239	return te.cc
240}
241
242type rpcType int
243
244const (
245	unaryRPC rpcType = iota
246	clientStreamRPC
247	serverStreamRPC
248	fullDuplexStreamRPC
249)
250
251type rpcConfig struct {
252	count    int  // Number of requests and responses for streaming RPCs.
253	success  bool // Whether the RPC should succeed or return error.
254	failfast bool
255	callType rpcType // Type of RPC.
256}
257
258func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
259	var (
260		resp *testpb.SimpleResponse
261		req  *testpb.SimpleRequest
262		err  error
263	)
264	tc := testpb.NewTestServiceClient(te.clientConn())
265	if c.success {
266		req = &testpb.SimpleRequest{Id: errorID + 1}
267	} else {
268		req = &testpb.SimpleRequest{Id: errorID}
269	}
270	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
271	resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast))
272	return req, resp, err
273}
274
275func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
276	var (
277		reqs  []*testpb.SimpleRequest
278		resps []*testpb.SimpleResponse
279		err   error
280	)
281	tc := testpb.NewTestServiceClient(te.clientConn())
282	stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
283	if err != nil {
284		return reqs, resps, err
285	}
286	var startID int32
287	if !c.success {
288		startID = errorID
289	}
290	for i := 0; i < c.count; i++ {
291		req := &testpb.SimpleRequest{
292			Id: int32(i) + startID,
293		}
294		reqs = append(reqs, req)
295		if err = stream.Send(req); err != nil {
296			return reqs, resps, err
297		}
298		var resp *testpb.SimpleResponse
299		if resp, err = stream.Recv(); err != nil {
300			return reqs, resps, err
301		}
302		resps = append(resps, resp)
303	}
304	if err = stream.CloseSend(); err != nil && err != io.EOF {
305		return reqs, resps, err
306	}
307	if _, err = stream.Recv(); err != io.EOF {
308		return reqs, resps, err
309	}
310
311	return reqs, resps, nil
312}
313
314func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
315	var (
316		reqs []*testpb.SimpleRequest
317		resp *testpb.SimpleResponse
318		err  error
319	)
320	tc := testpb.NewTestServiceClient(te.clientConn())
321	stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
322	if err != nil {
323		return reqs, resp, err
324	}
325	var startID int32
326	if !c.success {
327		startID = errorID
328	}
329	for i := 0; i < c.count; i++ {
330		req := &testpb.SimpleRequest{
331			Id: int32(i) + startID,
332		}
333		reqs = append(reqs, req)
334		if err = stream.Send(req); err != nil {
335			return reqs, resp, err
336		}
337	}
338	resp, err = stream.CloseAndRecv()
339	return reqs, resp, err
340}
341
342func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
343	var (
344		req   *testpb.SimpleRequest
345		resps []*testpb.SimpleResponse
346		err   error
347	)
348
349	tc := testpb.NewTestServiceClient(te.clientConn())
350
351	var startID int32
352	if !c.success {
353		startID = errorID
354	}
355	req = &testpb.SimpleRequest{Id: startID}
356	stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast))
357	if err != nil {
358		return req, resps, err
359	}
360	for {
361		var resp *testpb.SimpleResponse
362		resp, err := stream.Recv()
363		if err == io.EOF {
364			return req, resps, nil
365		} else if err != nil {
366			return req, resps, err
367		}
368		resps = append(resps, resp)
369	}
370}
371
372type expectedData struct {
373	method      string
374	serverAddr  string
375	compression string
376	reqIdx      int
377	requests    []*testpb.SimpleRequest
378	respIdx     int
379	responses   []*testpb.SimpleResponse
380	err         error
381	failfast    bool
382}
383
384type gotData struct {
385	ctx    context.Context
386	client bool
387	s      interface{} // This could be RPCStats or ConnStats.
388}
389
390const (
391	begin int = iota
392	end
393	inPayload
394	inHeader
395	inTrailer
396	outPayload
397	outHeader
398	// TODO: test outTrailer ?
399	connBegin
400	connEnd
401)
402
403func checkBegin(t *testing.T, d *gotData, e *expectedData) {
404	var (
405		ok bool
406		st *stats.Begin
407	)
408	if st, ok = d.s.(*stats.Begin); !ok {
409		t.Fatalf("got %T, want Begin", d.s)
410	}
411	if d.ctx == nil {
412		t.Fatalf("d.ctx = nil, want <non-nil>")
413	}
414	if st.BeginTime.IsZero() {
415		t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
416	}
417	if d.client {
418		if st.FailFast != e.failfast {
419			t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
420		}
421	}
422}
423
424func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
425	var (
426		ok bool
427		st *stats.InHeader
428	)
429	if st, ok = d.s.(*stats.InHeader); !ok {
430		t.Fatalf("got %T, want InHeader", d.s)
431	}
432	if d.ctx == nil {
433		t.Fatalf("d.ctx = nil, want <non-nil>")
434	}
435	if d.client {
436		// additional headers might be injected so instead of testing equality, test that all the
437		// expected headers keys have the expected header values.
438		for key := range testHeaderMetadata {
439			if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) {
440				t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key))
441			}
442		}
443	} else {
444		if st.FullMethod != e.method {
445			t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
446		}
447		if st.LocalAddr.String() != e.serverAddr {
448			t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr)
449		}
450		if st.Compression != e.compression {
451			t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
452		}
453		// additional headers might be injected so instead of testing equality, test that all the
454		// expected headers keys have the expected header values.
455		for key := range testMetadata {
456			if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) {
457				t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key))
458			}
459		}
460
461		if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok {
462			if connInfo.RemoteAddr != st.RemoteAddr {
463				t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr)
464			}
465			if connInfo.LocalAddr != st.LocalAddr {
466				t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr)
467			}
468		} else {
469			t.Fatalf("got context %v, want one with connCtxKey", d.ctx)
470		}
471		if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
472			if rpcInfo.FullMethodName != st.FullMethod {
473				t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
474			}
475		} else {
476			t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
477		}
478	}
479}
480
481func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
482	var (
483		ok bool
484		st *stats.InPayload
485	)
486	if st, ok = d.s.(*stats.InPayload); !ok {
487		t.Fatalf("got %T, want InPayload", d.s)
488	}
489	if d.ctx == nil {
490		t.Fatalf("d.ctx = nil, want <non-nil>")
491	}
492	if d.client {
493		b, err := proto.Marshal(e.responses[e.respIdx])
494		if err != nil {
495			t.Fatalf("failed to marshal message: %v", err)
496		}
497		if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
498			t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
499		}
500		e.respIdx++
501		if string(st.Data) != string(b) {
502			t.Fatalf("st.Data = %v, want %v", st.Data, b)
503		}
504		if st.Length != len(b) {
505			t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
506		}
507	} else {
508		b, err := proto.Marshal(e.requests[e.reqIdx])
509		if err != nil {
510			t.Fatalf("failed to marshal message: %v", err)
511		}
512		if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
513			t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
514		}
515		e.reqIdx++
516		if string(st.Data) != string(b) {
517			t.Fatalf("st.Data = %v, want %v", st.Data, b)
518		}
519		if st.Length != len(b) {
520			t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
521		}
522	}
523	// Below are sanity checks that WireLength and RecvTime are populated.
524	// TODO: check values of WireLength and RecvTime.
525	if len(st.Data) > 0 && st.WireLength == 0 {
526		t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>",
527			st.WireLength)
528	}
529	if st.RecvTime.IsZero() {
530		t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
531	}
532}
533
534func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
535	var (
536		ok bool
537		st *stats.InTrailer
538	)
539	if st, ok = d.s.(*stats.InTrailer); !ok {
540		t.Fatalf("got %T, want InTrailer", d.s)
541	}
542	if d.ctx == nil {
543		t.Fatalf("d.ctx = nil, want <non-nil>")
544	}
545	if !st.Client {
546		t.Fatalf("st IsClient = false, want true")
547	}
548	if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
549		t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
550	}
551}
552
553func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
554	var (
555		ok bool
556		st *stats.OutHeader
557	)
558	if st, ok = d.s.(*stats.OutHeader); !ok {
559		t.Fatalf("got %T, want OutHeader", d.s)
560	}
561	if d.ctx == nil {
562		t.Fatalf("d.ctx = nil, want <non-nil>")
563	}
564	if d.client {
565		if st.FullMethod != e.method {
566			t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
567		}
568		if st.RemoteAddr.String() != e.serverAddr {
569			t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr)
570		}
571		if st.Compression != e.compression {
572			t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
573		}
574		// additional headers might be injected so instead of testing equality, test that all the
575		// expected headers keys have the expected header values.
576		for key := range testMetadata {
577			if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) {
578				t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key))
579			}
580		}
581
582		if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
583			if rpcInfo.FullMethodName != st.FullMethod {
584				t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
585			}
586		} else {
587			t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
588		}
589	} else {
590		// additional headers might be injected so instead of testing equality, test that all the
591		// expected headers keys have the expected header values.
592		for key := range testHeaderMetadata {
593			if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) {
594				t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key))
595			}
596		}
597	}
598}
599
600func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
601	var (
602		ok bool
603		st *stats.OutPayload
604	)
605	if st, ok = d.s.(*stats.OutPayload); !ok {
606		t.Fatalf("got %T, want OutPayload", d.s)
607	}
608	if d.ctx == nil {
609		t.Fatalf("d.ctx = nil, want <non-nil>")
610	}
611	if d.client {
612		b, err := proto.Marshal(e.requests[e.reqIdx])
613		if err != nil {
614			t.Fatalf("failed to marshal message: %v", err)
615		}
616		if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
617			t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
618		}
619		e.reqIdx++
620		if string(st.Data) != string(b) {
621			t.Fatalf("st.Data = %v, want %v", st.Data, b)
622		}
623		if st.Length != len(b) {
624			t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
625		}
626	} else {
627		b, err := proto.Marshal(e.responses[e.respIdx])
628		if err != nil {
629			t.Fatalf("failed to marshal message: %v", err)
630		}
631		if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
632			t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
633		}
634		e.respIdx++
635		if string(st.Data) != string(b) {
636			t.Fatalf("st.Data = %v, want %v", st.Data, b)
637		}
638		if st.Length != len(b) {
639			t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
640		}
641	}
642	// Below are sanity checks that WireLength and SentTime are populated.
643	// TODO: check values of WireLength and SentTime.
644	if len(st.Data) > 0 && st.WireLength == 0 {
645		t.Fatalf("st.WireLength = %v with non-empty data, want <non-zero>",
646			st.WireLength)
647	}
648	if st.SentTime.IsZero() {
649		t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime)
650	}
651}
652
653func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
654	var (
655		ok bool
656		st *stats.OutTrailer
657	)
658	if st, ok = d.s.(*stats.OutTrailer); !ok {
659		t.Fatalf("got %T, want OutTrailer", d.s)
660	}
661	if d.ctx == nil {
662		t.Fatalf("d.ctx = nil, want <non-nil>")
663	}
664	if st.Client {
665		t.Fatalf("st IsClient = true, want false")
666	}
667	if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
668		t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
669	}
670}
671
672func checkEnd(t *testing.T, d *gotData, e *expectedData) {
673	var (
674		ok bool
675		st *stats.End
676	)
677	if st, ok = d.s.(*stats.End); !ok {
678		t.Fatalf("got %T, want End", d.s)
679	}
680	if d.ctx == nil {
681		t.Fatalf("d.ctx = nil, want <non-nil>")
682	}
683	if st.BeginTime.IsZero() {
684		t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
685	}
686	if st.EndTime.IsZero() {
687		t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
688	}
689
690	actual, ok := status.FromError(st.Error)
691	if !ok {
692		t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
693	}
694
695	expectedStatus, _ := status.FromError(e.err)
696	if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() {
697		t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
698	}
699
700	if st.Client {
701		if !reflect.DeepEqual(st.Trailer, testTrailerMetadata) {
702			t.Fatalf("st.Trailer = %v, want %v", st.Trailer, testTrailerMetadata)
703		}
704	} else {
705		if st.Trailer != nil {
706			t.Fatalf("st.Trailer = %v, want nil", st.Trailer)
707		}
708	}
709}
710
711func checkConnBegin(t *testing.T, d *gotData, e *expectedData) {
712	var (
713		ok bool
714		st *stats.ConnBegin
715	)
716	if st, ok = d.s.(*stats.ConnBegin); !ok {
717		t.Fatalf("got %T, want ConnBegin", d.s)
718	}
719	if d.ctx == nil {
720		t.Fatalf("d.ctx = nil, want <non-nil>")
721	}
722	st.IsClient() // TODO remove this.
723}
724
725func checkConnEnd(t *testing.T, d *gotData, e *expectedData) {
726	var (
727		ok bool
728		st *stats.ConnEnd
729	)
730	if st, ok = d.s.(*stats.ConnEnd); !ok {
731		t.Fatalf("got %T, want ConnEnd", d.s)
732	}
733	if d.ctx == nil {
734		t.Fatalf("d.ctx = nil, want <non-nil>")
735	}
736	st.IsClient() // TODO remove this.
737}
738
739type statshandler struct {
740	mu      sync.Mutex
741	gotRPC  []*gotData
742	gotConn []*gotData
743}
744
745func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
746	return context.WithValue(ctx, connCtxKey{}, info)
747}
748
749func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
750	return context.WithValue(ctx, rpcCtxKey{}, info)
751}
752
753func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
754	h.mu.Lock()
755	defer h.mu.Unlock()
756	h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
757}
758
759func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
760	h.mu.Lock()
761	defer h.mu.Unlock()
762	h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
763}
764
765func checkConnStats(t *testing.T, got []*gotData) {
766	if len(got) <= 0 || len(got)%2 != 0 {
767		for i, g := range got {
768			t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
769		}
770		t.Fatalf("got %v stats, want even positive number", len(got))
771	}
772	// The first conn stats must be a ConnBegin.
773	checkConnBegin(t, got[0], nil)
774	// The last conn stats must be a ConnEnd.
775	checkConnEnd(t, got[len(got)-1], nil)
776}
777
778func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
779	if len(got) != len(checkFuncs) {
780		for i, g := range got {
781			t.Errorf(" - %v, %T", i, g.s)
782		}
783		t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
784	}
785
786	var rpcctx context.Context
787	for i := 0; i < len(got); i++ {
788		if _, ok := got[i].s.(stats.RPCStats); ok {
789			if rpcctx != nil && got[i].ctx != rpcctx {
790				t.Fatalf("got different contexts with stats %T", got[i].s)
791			}
792			rpcctx = got[i].ctx
793		}
794	}
795
796	for i, f := range checkFuncs {
797		f(t, got[i], expect)
798	}
799}
800
801func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
802	h := &statshandler{}
803	te := newTest(t, tc, nil, h)
804	te.startServer(&testServer{})
805	defer te.tearDown()
806
807	var (
808		reqs   []*testpb.SimpleRequest
809		resps  []*testpb.SimpleResponse
810		err    error
811		method string
812
813		req  *testpb.SimpleRequest
814		resp *testpb.SimpleResponse
815		e    error
816	)
817
818	switch cc.callType {
819	case unaryRPC:
820		method = "/grpc.testing.TestService/UnaryCall"
821		req, resp, e = te.doUnaryCall(cc)
822		reqs = []*testpb.SimpleRequest{req}
823		resps = []*testpb.SimpleResponse{resp}
824		err = e
825	case clientStreamRPC:
826		method = "/grpc.testing.TestService/ClientStreamCall"
827		reqs, resp, e = te.doClientStreamCall(cc)
828		resps = []*testpb.SimpleResponse{resp}
829		err = e
830	case serverStreamRPC:
831		method = "/grpc.testing.TestService/ServerStreamCall"
832		req, resps, e = te.doServerStreamCall(cc)
833		reqs = []*testpb.SimpleRequest{req}
834		err = e
835	case fullDuplexStreamRPC:
836		method = "/grpc.testing.TestService/FullDuplexCall"
837		reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
838	}
839	if cc.success != (err == nil) {
840		t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
841	}
842	te.cc.Close()
843	te.srv.GracefulStop() // Wait for the server to stop.
844
845	for {
846		h.mu.Lock()
847		if len(h.gotRPC) >= len(checkFuncs) {
848			h.mu.Unlock()
849			break
850		}
851		h.mu.Unlock()
852		time.Sleep(10 * time.Millisecond)
853	}
854
855	for {
856		h.mu.Lock()
857		if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
858			h.mu.Unlock()
859			break
860		}
861		h.mu.Unlock()
862		time.Sleep(10 * time.Millisecond)
863	}
864
865	expect := &expectedData{
866		serverAddr:  te.srvAddr,
867		compression: tc.compress,
868		method:      method,
869		requests:    reqs,
870		responses:   resps,
871		err:         err,
872	}
873
874	h.mu.Lock()
875	checkConnStats(t, h.gotConn)
876	h.mu.Unlock()
877	checkServerStats(t, h.gotRPC, expect, checkFuncs)
878}
879
880func TestServerStatsUnaryRPC(t *testing.T) {
881	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
882		checkInHeader,
883		checkBegin,
884		checkInPayload,
885		checkOutHeader,
886		checkOutPayload,
887		checkOutTrailer,
888		checkEnd,
889	})
890}
891
892func TestServerStatsUnaryRPCError(t *testing.T) {
893	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
894		checkInHeader,
895		checkBegin,
896		checkInPayload,
897		checkOutHeader,
898		checkOutTrailer,
899		checkEnd,
900	})
901}
902
903func TestServerStatsClientStreamRPC(t *testing.T) {
904	count := 5
905	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
906		checkInHeader,
907		checkBegin,
908		checkOutHeader,
909	}
910	ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
911		checkInPayload,
912	}
913	for i := 0; i < count; i++ {
914		checkFuncs = append(checkFuncs, ioPayFuncs...)
915	}
916	checkFuncs = append(checkFuncs,
917		checkOutPayload,
918		checkOutTrailer,
919		checkEnd,
920	)
921	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
922}
923
924func TestServerStatsClientStreamRPCError(t *testing.T) {
925	count := 1
926	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
927		checkInHeader,
928		checkBegin,
929		checkOutHeader,
930		checkInPayload,
931		checkOutTrailer,
932		checkEnd,
933	})
934}
935
936func TestServerStatsServerStreamRPC(t *testing.T) {
937	count := 5
938	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
939		checkInHeader,
940		checkBegin,
941		checkInPayload,
942		checkOutHeader,
943	}
944	ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
945		checkOutPayload,
946	}
947	for i := 0; i < count; i++ {
948		checkFuncs = append(checkFuncs, ioPayFuncs...)
949	}
950	checkFuncs = append(checkFuncs,
951		checkOutTrailer,
952		checkEnd,
953	)
954	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
955}
956
957func TestServerStatsServerStreamRPCError(t *testing.T) {
958	count := 5
959	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
960		checkInHeader,
961		checkBegin,
962		checkInPayload,
963		checkOutHeader,
964		checkOutTrailer,
965		checkEnd,
966	})
967}
968
969func TestServerStatsFullDuplexRPC(t *testing.T) {
970	count := 5
971	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
972		checkInHeader,
973		checkBegin,
974		checkOutHeader,
975	}
976	ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
977		checkInPayload,
978		checkOutPayload,
979	}
980	for i := 0; i < count; i++ {
981		checkFuncs = append(checkFuncs, ioPayFuncs...)
982	}
983	checkFuncs = append(checkFuncs,
984		checkOutTrailer,
985		checkEnd,
986	)
987	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
988}
989
990func TestServerStatsFullDuplexRPCError(t *testing.T) {
991	count := 5
992	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
993		checkInHeader,
994		checkBegin,
995		checkOutHeader,
996		checkInPayload,
997		checkOutTrailer,
998		checkEnd,
999	})
1000}
1001
1002type checkFuncWithCount struct {
1003	f func(t *testing.T, d *gotData, e *expectedData)
1004	c int // expected count
1005}
1006
1007func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) {
1008	var expectLen int
1009	for _, v := range checkFuncs {
1010		expectLen += v.c
1011	}
1012	if len(got) != expectLen {
1013		for i, g := range got {
1014			t.Errorf(" - %v, %T", i, g.s)
1015		}
1016		t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
1017	}
1018
1019	var tagInfoInCtx *stats.RPCTagInfo
1020	for i := 0; i < len(got); i++ {
1021		if _, ok := got[i].s.(stats.RPCStats); ok {
1022			tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
1023			if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
1024				t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
1025			}
1026			tagInfoInCtx = tagInfoInCtxNew
1027		}
1028	}
1029
1030	for _, s := range got {
1031		switch s.s.(type) {
1032		case *stats.Begin:
1033			if checkFuncs[begin].c <= 0 {
1034				t.Fatalf("unexpected stats: %T", s.s)
1035			}
1036			checkFuncs[begin].f(t, s, expect)
1037			checkFuncs[begin].c--
1038		case *stats.OutHeader:
1039			if checkFuncs[outHeader].c <= 0 {
1040				t.Fatalf("unexpected stats: %T", s.s)
1041			}
1042			checkFuncs[outHeader].f(t, s, expect)
1043			checkFuncs[outHeader].c--
1044		case *stats.OutPayload:
1045			if checkFuncs[outPayload].c <= 0 {
1046				t.Fatalf("unexpected stats: %T", s.s)
1047			}
1048			checkFuncs[outPayload].f(t, s, expect)
1049			checkFuncs[outPayload].c--
1050		case *stats.InHeader:
1051			if checkFuncs[inHeader].c <= 0 {
1052				t.Fatalf("unexpected stats: %T", s.s)
1053			}
1054			checkFuncs[inHeader].f(t, s, expect)
1055			checkFuncs[inHeader].c--
1056		case *stats.InPayload:
1057			if checkFuncs[inPayload].c <= 0 {
1058				t.Fatalf("unexpected stats: %T", s.s)
1059			}
1060			checkFuncs[inPayload].f(t, s, expect)
1061			checkFuncs[inPayload].c--
1062		case *stats.InTrailer:
1063			if checkFuncs[inTrailer].c <= 0 {
1064				t.Fatalf("unexpected stats: %T", s.s)
1065			}
1066			checkFuncs[inTrailer].f(t, s, expect)
1067			checkFuncs[inTrailer].c--
1068		case *stats.End:
1069			if checkFuncs[end].c <= 0 {
1070				t.Fatalf("unexpected stats: %T", s.s)
1071			}
1072			checkFuncs[end].f(t, s, expect)
1073			checkFuncs[end].c--
1074		case *stats.ConnBegin:
1075			if checkFuncs[connBegin].c <= 0 {
1076				t.Fatalf("unexpected stats: %T", s.s)
1077			}
1078			checkFuncs[connBegin].f(t, s, expect)
1079			checkFuncs[connBegin].c--
1080		case *stats.ConnEnd:
1081			if checkFuncs[connEnd].c <= 0 {
1082				t.Fatalf("unexpected stats: %T", s.s)
1083			}
1084			checkFuncs[connEnd].f(t, s, expect)
1085			checkFuncs[connEnd].c--
1086		default:
1087			t.Fatalf("unexpected stats: %T", s.s)
1088		}
1089	}
1090}
1091
1092func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
1093	h := &statshandler{}
1094	te := newTest(t, tc, h, nil)
1095	te.startServer(&testServer{})
1096	defer te.tearDown()
1097
1098	var (
1099		reqs   []*testpb.SimpleRequest
1100		resps  []*testpb.SimpleResponse
1101		method string
1102		err    error
1103
1104		req  *testpb.SimpleRequest
1105		resp *testpb.SimpleResponse
1106		e    error
1107	)
1108	switch cc.callType {
1109	case unaryRPC:
1110		method = "/grpc.testing.TestService/UnaryCall"
1111		req, resp, e = te.doUnaryCall(cc)
1112		reqs = []*testpb.SimpleRequest{req}
1113		resps = []*testpb.SimpleResponse{resp}
1114		err = e
1115	case clientStreamRPC:
1116		method = "/grpc.testing.TestService/ClientStreamCall"
1117		reqs, resp, e = te.doClientStreamCall(cc)
1118		resps = []*testpb.SimpleResponse{resp}
1119		err = e
1120	case serverStreamRPC:
1121		method = "/grpc.testing.TestService/ServerStreamCall"
1122		req, resps, e = te.doServerStreamCall(cc)
1123		reqs = []*testpb.SimpleRequest{req}
1124		err = e
1125	case fullDuplexStreamRPC:
1126		method = "/grpc.testing.TestService/FullDuplexCall"
1127		reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
1128	}
1129	if cc.success != (err == nil) {
1130		t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
1131	}
1132	te.cc.Close()
1133	te.srv.GracefulStop() // Wait for the server to stop.
1134
1135	lenRPCStats := 0
1136	for _, v := range checkFuncs {
1137		lenRPCStats += v.c
1138	}
1139	for {
1140		h.mu.Lock()
1141		if len(h.gotRPC) >= lenRPCStats {
1142			h.mu.Unlock()
1143			break
1144		}
1145		h.mu.Unlock()
1146		time.Sleep(10 * time.Millisecond)
1147	}
1148
1149	for {
1150		h.mu.Lock()
1151		if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
1152			h.mu.Unlock()
1153			break
1154		}
1155		h.mu.Unlock()
1156		time.Sleep(10 * time.Millisecond)
1157	}
1158
1159	expect := &expectedData{
1160		serverAddr:  te.srvAddr,
1161		compression: tc.compress,
1162		method:      method,
1163		requests:    reqs,
1164		responses:   resps,
1165		failfast:    cc.failfast,
1166		err:         err,
1167	}
1168
1169	h.mu.Lock()
1170	checkConnStats(t, h.gotConn)
1171	h.mu.Unlock()
1172	checkClientStats(t, h.gotRPC, expect, checkFuncs)
1173}
1174
1175func TestClientStatsUnaryRPC(t *testing.T) {
1176	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
1177		begin:      {checkBegin, 1},
1178		outHeader:  {checkOutHeader, 1},
1179		outPayload: {checkOutPayload, 1},
1180		inHeader:   {checkInHeader, 1},
1181		inPayload:  {checkInPayload, 1},
1182		inTrailer:  {checkInTrailer, 1},
1183		end:        {checkEnd, 1},
1184	})
1185}
1186
1187func TestClientStatsUnaryRPCError(t *testing.T) {
1188	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
1189		begin:      {checkBegin, 1},
1190		outHeader:  {checkOutHeader, 1},
1191		outPayload: {checkOutPayload, 1},
1192		inHeader:   {checkInHeader, 1},
1193		inTrailer:  {checkInTrailer, 1},
1194		end:        {checkEnd, 1},
1195	})
1196}
1197
1198func TestClientStatsClientStreamRPC(t *testing.T) {
1199	count := 5
1200	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
1201		begin:      {checkBegin, 1},
1202		outHeader:  {checkOutHeader, 1},
1203		inHeader:   {checkInHeader, 1},
1204		outPayload: {checkOutPayload, count},
1205		inTrailer:  {checkInTrailer, 1},
1206		inPayload:  {checkInPayload, 1},
1207		end:        {checkEnd, 1},
1208	})
1209}
1210
1211func TestClientStatsClientStreamRPCError(t *testing.T) {
1212	count := 1
1213	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
1214		begin:      {checkBegin, 1},
1215		outHeader:  {checkOutHeader, 1},
1216		inHeader:   {checkInHeader, 1},
1217		outPayload: {checkOutPayload, 1},
1218		inTrailer:  {checkInTrailer, 1},
1219		end:        {checkEnd, 1},
1220	})
1221}
1222
1223func TestClientStatsServerStreamRPC(t *testing.T) {
1224	count := 5
1225	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
1226		begin:      {checkBegin, 1},
1227		outHeader:  {checkOutHeader, 1},
1228		outPayload: {checkOutPayload, 1},
1229		inHeader:   {checkInHeader, 1},
1230		inPayload:  {checkInPayload, count},
1231		inTrailer:  {checkInTrailer, 1},
1232		end:        {checkEnd, 1},
1233	})
1234}
1235
1236func TestClientStatsServerStreamRPCError(t *testing.T) {
1237	count := 5
1238	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
1239		begin:      {checkBegin, 1},
1240		outHeader:  {checkOutHeader, 1},
1241		outPayload: {checkOutPayload, 1},
1242		inHeader:   {checkInHeader, 1},
1243		inTrailer:  {checkInTrailer, 1},
1244		end:        {checkEnd, 1},
1245	})
1246}
1247
1248func TestClientStatsFullDuplexRPC(t *testing.T) {
1249	count := 5
1250	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
1251		begin:      {checkBegin, 1},
1252		outHeader:  {checkOutHeader, 1},
1253		outPayload: {checkOutPayload, count},
1254		inHeader:   {checkInHeader, 1},
1255		inPayload:  {checkInPayload, count},
1256		inTrailer:  {checkInTrailer, 1},
1257		end:        {checkEnd, 1},
1258	})
1259}
1260
1261func TestClientStatsFullDuplexRPCError(t *testing.T) {
1262	count := 5
1263	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
1264		begin:      {checkBegin, 1},
1265		outHeader:  {checkOutHeader, 1},
1266		outPayload: {checkOutPayload, 1},
1267		inHeader:   {checkInHeader, 1},
1268		inTrailer:  {checkInTrailer, 1},
1269		end:        {checkEnd, 1},
1270	})
1271}
1272
1273func TestTags(t *testing.T) {
1274	b := []byte{5, 2, 4, 3, 1}
1275	ctx := stats.SetTags(context.Background(), b)
1276	if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
1277		t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
1278	}
1279	if tg := stats.Tags(ctx); tg != nil {
1280		t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
1281	}
1282
1283	ctx = stats.SetIncomingTags(context.Background(), b)
1284	if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
1285		t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
1286	}
1287	if tg := stats.OutgoingTags(ctx); tg != nil {
1288		t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
1289	}
1290}
1291
1292func TestTrace(t *testing.T) {
1293	b := []byte{5, 2, 4, 3, 1}
1294	ctx := stats.SetTrace(context.Background(), b)
1295	if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
1296		t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
1297	}
1298	if tr := stats.Trace(ctx); tr != nil {
1299		t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
1300	}
1301
1302	ctx = stats.SetIncomingTrace(context.Background(), b)
1303	if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
1304		t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
1305	}
1306	if tr := stats.OutgoingTrace(ctx); tr != nil {
1307		t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)
1308	}
1309}
1310