1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package test
20
21import (
22	"context"
23	"crypto/tls"
24	"fmt"
25	"net"
26	"reflect"
27	"strings"
28	"sync"
29	"testing"
30	"time"
31
32	"golang.org/x/net/http2"
33	"google.golang.org/grpc"
34	_ "google.golang.org/grpc/balancer/grpclb"
35	"google.golang.org/grpc/balancer/roundrobin"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/connectivity"
38	"google.golang.org/grpc/credentials"
39	"google.golang.org/grpc/internal"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/stubserver"
42	"google.golang.org/grpc/keepalive"
43	"google.golang.org/grpc/resolver"
44	"google.golang.org/grpc/resolver/manual"
45	"google.golang.org/grpc/status"
46	testpb "google.golang.org/grpc/test/grpc_testing"
47	"google.golang.org/grpc/testdata"
48)
49
50func czCleanupWrapper(cleanup func() error, t *testing.T) {
51	if err := cleanup(); err != nil {
52		t.Error(err)
53	}
54}
55
56func verifyResultWithDelay(f func() (bool, error)) error {
57	var ok bool
58	var err error
59	for i := 0; i < 1000; i++ {
60		if ok, err = f(); ok {
61			return nil
62		}
63		time.Sleep(10 * time.Millisecond)
64	}
65	return err
66}
67
68func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
69	testcases := []struct {
70		total  int
71		start  int64
72		max    int64
73		length int64
74		end    bool
75	}{
76		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
77		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
78		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
79		{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
80		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
81		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
82	}
83
84	for _, c := range testcases {
85		czCleanup := channelz.NewChannelzStorage()
86		defer czCleanupWrapper(czCleanup, t)
87		e := tcpClearRREnv
88		te := newTest(t, e)
89		te.startServers(&testServer{security: e.security}, c.total)
90
91		ss, end := channelz.GetServers(c.start, c.max)
92		if int64(len(ss)) != c.length || end != c.end {
93			t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
94		}
95		te.tearDown()
96		ss, end = channelz.GetServers(c.start, c.max)
97		if len(ss) != 0 || !end {
98			t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
99		}
100	}
101}
102
103func (s) TestCZGetServer(t *testing.T) {
104	czCleanup := channelz.NewChannelzStorage()
105	defer czCleanupWrapper(czCleanup, t)
106	e := tcpClearRREnv
107	te := newTest(t, e)
108	te.startServer(&testServer{security: e.security})
109	defer te.tearDown()
110
111	ss, _ := channelz.GetServers(0, 0)
112	if len(ss) != 1 {
113		t.Fatalf("there should only be one server, not %d", len(ss))
114	}
115
116	serverID := ss[0].ID
117	srv := channelz.GetServer(serverID)
118	if srv == nil {
119		t.Fatalf("server %d does not exist", serverID)
120	}
121	if srv.ID != serverID {
122		t.Fatalf("server want id %d, but got %d", serverID, srv.ID)
123	}
124
125	te.tearDown()
126
127	if err := verifyResultWithDelay(func() (bool, error) {
128		srv := channelz.GetServer(serverID)
129		if srv != nil {
130			return false, fmt.Errorf("server %d should not exist", serverID)
131		}
132
133		return true, nil
134	}); err != nil {
135		t.Fatal(err)
136	}
137}
138
139func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
140	testcases := []struct {
141		total  int
142		start  int64
143		max    int64
144		length int64
145		end    bool
146	}{
147		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
148		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
149		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
150		{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
151		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
152		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
153	}
154
155	for _, c := range testcases {
156		czCleanup := channelz.NewChannelzStorage()
157		defer czCleanupWrapper(czCleanup, t)
158		e := tcpClearRREnv
159		te := newTest(t, e)
160		var ccs []*grpc.ClientConn
161		for i := 0; i < c.total; i++ {
162			cc := te.clientConn()
163			te.cc = nil
164			// avoid making next dial blocking
165			te.srvAddr = ""
166			ccs = append(ccs, cc)
167		}
168		if err := verifyResultWithDelay(func() (bool, error) {
169			if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end {
170				return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
171			}
172			return true, nil
173		}); err != nil {
174			t.Fatal(err)
175		}
176
177		for _, cc := range ccs {
178			cc.Close()
179		}
180
181		if err := verifyResultWithDelay(func() (bool, error) {
182			if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
183				return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
184			}
185			return true, nil
186		}); err != nil {
187			t.Fatal(err)
188		}
189		te.tearDown()
190	}
191}
192
193func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) {
194	czCleanup := channelz.NewChannelzStorage()
195	defer czCleanupWrapper(czCleanup, t)
196	// Make dial fails (due to no transport security specified)
197	_, err := grpc.Dial("fake.addr")
198	if err == nil {
199		t.Fatal("expecting dial to fail")
200	}
201	if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end {
202		t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end)
203	}
204}
205
206func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
207	czCleanup := channelz.NewChannelzStorage()
208	defer czCleanupWrapper(czCleanup, t)
209	e := tcpClearRREnv
210	// avoid calling API to set balancer type, which will void service config's change of balancer.
211	e.balancer = ""
212	te := newTest(t, e)
213	r := manual.NewBuilderWithScheme("whatever")
214	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
215	r.InitialState(resolver.State{Addresses: resolvedAddrs})
216	te.resolverScheme = r.Scheme()
217	te.clientConn(grpc.WithResolvers(r))
218	defer te.tearDown()
219
220	if err := verifyResultWithDelay(func() (bool, error) {
221		tcs, _ := channelz.GetTopChannels(0, 0)
222		if len(tcs) != 1 {
223			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
224		}
225		if len(tcs[0].NestedChans) != 1 {
226			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
227		}
228		return true, nil
229	}); err != nil {
230		t.Fatal(err)
231	}
232
233	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
234
235	// wait for the shutdown of grpclb balancer
236	if err := verifyResultWithDelay(func() (bool, error) {
237		tcs, _ := channelz.GetTopChannels(0, 0)
238		if len(tcs) != 1 {
239			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
240		}
241		if len(tcs[0].NestedChans) != 0 {
242			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
243		}
244		return true, nil
245	}); err != nil {
246		t.Fatal(err)
247	}
248}
249
250func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
251	czCleanup := channelz.NewChannelzStorage()
252	defer czCleanupWrapper(czCleanup, t)
253	e := tcpClearRREnv
254	num := 3 // number of backends
255	te := newTest(t, e)
256	var svrAddrs []resolver.Address
257	te.startServers(&testServer{security: e.security}, num)
258	r := manual.NewBuilderWithScheme("whatever")
259	for _, a := range te.srvAddrs {
260		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
261	}
262	r.InitialState(resolver.State{Addresses: svrAddrs})
263	te.resolverScheme = r.Scheme()
264	te.clientConn(grpc.WithResolvers(r))
265	defer te.tearDown()
266	// Here, we just wait for all sockets to be up. In the future, if we implement
267	// IDLE, we may need to make several rpc calls to create the sockets.
268	if err := verifyResultWithDelay(func() (bool, error) {
269		tcs, _ := channelz.GetTopChannels(0, 0)
270		if len(tcs) != 1 {
271			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
272		}
273		if len(tcs[0].SubChans) != num {
274			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
275		}
276		count := 0
277		for k := range tcs[0].SubChans {
278			sc := channelz.GetSubChannel(k)
279			if sc == nil {
280				return false, fmt.Errorf("got <nil> subchannel")
281			}
282			count += len(sc.Sockets)
283		}
284		if count != num {
285			return false, fmt.Errorf("there should be %d sockets not %d", num, count)
286		}
287
288		return true, nil
289	}); err != nil {
290		t.Fatal(err)
291	}
292
293	r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
294
295	if err := verifyResultWithDelay(func() (bool, error) {
296		tcs, _ := channelz.GetTopChannels(0, 0)
297		if len(tcs) != 1 {
298			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
299		}
300		if len(tcs[0].SubChans) != num-1 {
301			return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
302		}
303		count := 0
304		for k := range tcs[0].SubChans {
305			sc := channelz.GetSubChannel(k)
306			if sc == nil {
307				return false, fmt.Errorf("got <nil> subchannel")
308			}
309			count += len(sc.Sockets)
310		}
311		if count != num-1 {
312			return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
313		}
314
315		return true, nil
316	}); err != nil {
317		t.Fatal(err)
318	}
319}
320
321func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
322	testcases := []struct {
323		total  int
324		start  int64
325		max    int64
326		length int64
327		end    bool
328	}{
329		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
330		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
331		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
332		{total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true},
333		{total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true},
334		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
335		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
336	}
337
338	for _, c := range testcases {
339		czCleanup := channelz.NewChannelzStorage()
340		defer czCleanupWrapper(czCleanup, t)
341		e := tcpClearRREnv
342		te := newTest(t, e)
343		te.startServer(&testServer{security: e.security})
344		var ccs []*grpc.ClientConn
345		for i := 0; i < c.total; i++ {
346			cc := te.clientConn()
347			te.cc = nil
348			ccs = append(ccs, cc)
349		}
350
351		var svrID int64
352		if err := verifyResultWithDelay(func() (bool, error) {
353			ss, _ := channelz.GetServers(0, 0)
354			if len(ss) != 1 {
355				return false, fmt.Errorf("there should only be one server, not %d", len(ss))
356			}
357			if len(ss[0].ListenSockets) != 1 {
358				return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
359			}
360
361			startID := c.start
362			if startID != 0 {
363				ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total))
364				if int64(len(ns)) < c.start {
365					return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
366				}
367				startID = ns[c.start-1].ID + 1
368			}
369
370			ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
371			if int64(len(ns)) != c.length || end != c.end {
372				return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
373			}
374
375			svrID = ss[0].ID
376			return true, nil
377		}); err != nil {
378			t.Fatal(err)
379		}
380
381		for _, cc := range ccs {
382			cc.Close()
383		}
384
385		if err := verifyResultWithDelay(func() (bool, error) {
386			ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
387			if len(ns) != 0 {
388				return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
389			}
390			return true, nil
391		}); err != nil {
392			t.Fatal(err)
393		}
394		te.tearDown()
395	}
396}
397
398func (s) TestCZServerListenSocketDeletion(t *testing.T) {
399	czCleanup := channelz.NewChannelzStorage()
400	defer czCleanupWrapper(czCleanup, t)
401	s := grpc.NewServer()
402	lis, err := net.Listen("tcp", "localhost:0")
403	if err != nil {
404		t.Fatalf("failed to listen: %v", err)
405	}
406	go s.Serve(lis)
407	if err := verifyResultWithDelay(func() (bool, error) {
408		ss, _ := channelz.GetServers(0, 0)
409		if len(ss) != 1 {
410			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
411		}
412		if len(ss[0].ListenSockets) != 1 {
413			return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
414		}
415		return true, nil
416	}); err != nil {
417		t.Fatal(err)
418	}
419
420	lis.Close()
421	if err := verifyResultWithDelay(func() (bool, error) {
422		ss, _ := channelz.GetServers(0, 0)
423		if len(ss) != 1 {
424			return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
425		}
426		if len(ss[0].ListenSockets) != 0 {
427			return false, fmt.Errorf("there should only be %d server listen socket, not %d", 0, len(ss[0].ListenSockets))
428		}
429		return true, nil
430	}); err != nil {
431		t.Fatal(err)
432	}
433	s.Stop()
434}
435
436type dummyChannel struct{}
437
438func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
439	return &channelz.ChannelInternalMetric{}
440}
441
442type dummySocket struct{}
443
444func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
445	return &channelz.SocketInternalMetric{}
446}
447
448func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
449	//           +--+TopChan+---+
450	//           |              |
451	//           v              v
452	//    +-+SubChan1+--+   SubChan2
453	//    |             |
454	//    v             v
455	// Socket1       Socket2
456	czCleanup := channelz.NewChannelzStorage()
457	defer czCleanupWrapper(czCleanup, t)
458	topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
459	subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
460	subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
461	sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
462	sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
463
464	tcs, _ := channelz.GetTopChannels(0, 0)
465	if tcs == nil || len(tcs) != 1 {
466		t.Fatalf("There should be one TopChannel entry")
467	}
468	if len(tcs[0].SubChans) != 2 {
469		t.Fatalf("There should be two SubChannel entries")
470	}
471	sc := channelz.GetSubChannel(subChanID1)
472	if sc == nil || len(sc.Sockets) != 2 {
473		t.Fatalf("There should be two Socket entries")
474	}
475
476	channelz.RemoveEntry(topChanID)
477	tcs, _ = channelz.GetTopChannels(0, 0)
478	if tcs == nil || len(tcs) != 1 {
479		t.Fatalf("There should be one TopChannel entry")
480	}
481
482	channelz.RemoveEntry(subChanID1)
483	channelz.RemoveEntry(subChanID2)
484	tcs, _ = channelz.GetTopChannels(0, 0)
485	if tcs == nil || len(tcs) != 1 {
486		t.Fatalf("There should be one TopChannel entry")
487	}
488	if len(tcs[0].SubChans) != 1 {
489		t.Fatalf("There should be one SubChannel entry")
490	}
491
492	channelz.RemoveEntry(sktID1)
493	channelz.RemoveEntry(sktID2)
494	tcs, _ = channelz.GetTopChannels(0, 0)
495	if tcs != nil {
496		t.Fatalf("There should be no TopChannel entry")
497	}
498}
499
500func (s) TestCZChannelMetrics(t *testing.T) {
501	czCleanup := channelz.NewChannelzStorage()
502	defer czCleanupWrapper(czCleanup, t)
503	e := tcpClearRREnv
504	num := 3 // number of backends
505	te := newTest(t, e)
506	te.maxClientSendMsgSize = newInt(8)
507	var svrAddrs []resolver.Address
508	te.startServers(&testServer{security: e.security}, num)
509	r := manual.NewBuilderWithScheme("whatever")
510	for _, a := range te.srvAddrs {
511		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
512	}
513	r.InitialState(resolver.State{Addresses: svrAddrs})
514	te.resolverScheme = r.Scheme()
515	cc := te.clientConn(grpc.WithResolvers(r))
516	defer te.tearDown()
517	tc := testpb.NewTestServiceClient(cc)
518	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
519		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
520	}
521
522	const smallSize = 1
523	const largeSize = 8
524
525	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
526	if err != nil {
527		t.Fatal(err)
528	}
529	req := &testpb.SimpleRequest{
530		ResponseType: testpb.PayloadType_COMPRESSABLE,
531		ResponseSize: int32(smallSize),
532		Payload:      largePayload,
533	}
534
535	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
536		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
537	}
538
539	stream, err := tc.FullDuplexCall(context.Background())
540	if err != nil {
541		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
542	}
543	defer stream.CloseSend()
544	// Here, we just wait for all sockets to be up. In the future, if we implement
545	// IDLE, we may need to make several rpc calls to create the sockets.
546	if err := verifyResultWithDelay(func() (bool, error) {
547		tcs, _ := channelz.GetTopChannels(0, 0)
548		if len(tcs) != 1 {
549			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
550		}
551		if len(tcs[0].SubChans) != num {
552			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
553		}
554		var cst, csu, cf int64
555		for k := range tcs[0].SubChans {
556			sc := channelz.GetSubChannel(k)
557			if sc == nil {
558				return false, fmt.Errorf("got <nil> subchannel")
559			}
560			cst += sc.ChannelData.CallsStarted
561			csu += sc.ChannelData.CallsSucceeded
562			cf += sc.ChannelData.CallsFailed
563		}
564		if cst != 3 {
565			return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
566		}
567		if csu != 1 {
568			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
569		}
570		if cf != 1 {
571			return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
572		}
573		if tcs[0].ChannelData.CallsStarted != 3 {
574			return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted)
575		}
576		if tcs[0].ChannelData.CallsSucceeded != 1 {
577			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded)
578		}
579		if tcs[0].ChannelData.CallsFailed != 1 {
580			return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed)
581		}
582		return true, nil
583	}); err != nil {
584		t.Fatal(err)
585	}
586}
587
588func (s) TestCZServerMetrics(t *testing.T) {
589	czCleanup := channelz.NewChannelzStorage()
590	defer czCleanupWrapper(czCleanup, t)
591	e := tcpClearRREnv
592	te := newTest(t, e)
593	te.maxServerReceiveMsgSize = newInt(8)
594	te.startServer(&testServer{security: e.security})
595	defer te.tearDown()
596	cc := te.clientConn()
597	tc := testpb.NewTestServiceClient(cc)
598	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
599		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
600	}
601
602	const smallSize = 1
603	const largeSize = 8
604
605	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
606	if err != nil {
607		t.Fatal(err)
608	}
609	req := &testpb.SimpleRequest{
610		ResponseType: testpb.PayloadType_COMPRESSABLE,
611		ResponseSize: int32(smallSize),
612		Payload:      largePayload,
613	}
614	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
615		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
616	}
617
618	stream, err := tc.FullDuplexCall(context.Background())
619	if err != nil {
620		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
621	}
622	defer stream.CloseSend()
623
624	if err := verifyResultWithDelay(func() (bool, error) {
625		ss, _ := channelz.GetServers(0, 0)
626		if len(ss) != 1 {
627			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
628		}
629		if ss[0].ServerData.CallsStarted != 3 {
630			return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted)
631		}
632		if ss[0].ServerData.CallsSucceeded != 1 {
633			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded)
634		}
635		if ss[0].ServerData.CallsFailed != 1 {
636			return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed)
637		}
638		return true, nil
639	}); err != nil {
640		t.Fatal(err)
641	}
642}
643
644type testServiceClientWrapper struct {
645	testpb.TestServiceClient
646	mu             sync.RWMutex
647	streamsCreated int
648}
649
650func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
651	t.mu.RLock()
652	defer t.mu.RUnlock()
653	return uint32(2*t.streamsCreated - 1)
654}
655
656func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
657	t.mu.Lock()
658	defer t.mu.Unlock()
659	t.streamsCreated++
660	return t.TestServiceClient.EmptyCall(ctx, in, opts...)
661}
662
663func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
664	t.mu.Lock()
665	defer t.mu.Unlock()
666	t.streamsCreated++
667	return t.TestServiceClient.UnaryCall(ctx, in, opts...)
668}
669
670func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) {
671	t.mu.Lock()
672	defer t.mu.Unlock()
673	t.streamsCreated++
674	return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
675}
676
677func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) {
678	t.mu.Lock()
679	defer t.mu.Unlock()
680	t.streamsCreated++
681	return t.TestServiceClient.StreamingInputCall(ctx, opts...)
682}
683
684func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) {
685	t.mu.Lock()
686	defer t.mu.Unlock()
687	t.streamsCreated++
688	return t.TestServiceClient.FullDuplexCall(ctx, opts...)
689}
690
691func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) {
692	t.mu.Lock()
693	defer t.mu.Unlock()
694	t.streamsCreated++
695	return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
696}
697
698func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
699	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
700	defer cancel()
701	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
702		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
703	}
704}
705
706func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) {
707	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
708	defer cancel()
709	s, err := tc.StreamingInputCall(ctx)
710	if err != nil {
711		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
712	}
713	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
714	if err != nil {
715		t.Fatal(err)
716	}
717	s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
718}
719
720func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
721	const smallSize = 1
722	const largeSize = 2000
723
724	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
725	if err != nil {
726		t.Fatal(err)
727	}
728	req := &testpb.SimpleRequest{
729		ResponseType: testpb.PayloadType_COMPRESSABLE,
730		ResponseSize: int32(smallSize),
731		Payload:      largePayload,
732	}
733	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
734	defer cancel()
735	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
736		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
737	}
738}
739
740func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) {
741	ctx, cancel := context.WithCancel(context.Background())
742	stream, err := tc.FullDuplexCall(ctx)
743	if err != nil {
744		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
745	}
746
747	const smallSize = 1
748	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
749	if err != nil {
750		t.Fatal(err)
751	}
752
753	sreq := &testpb.StreamingOutputCallRequest{
754		ResponseType: testpb.PayloadType_COMPRESSABLE,
755		ResponseParameters: []*testpb.ResponseParameters{
756			{Size: smallSize},
757		},
758		Payload: smallPayload,
759	}
760
761	if err := stream.Send(sreq); err != nil {
762		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
763	}
764	if _, err := stream.Recv(); err != nil {
765		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
766	}
767	// By canceling the call, the client will send rst_stream to end the call, and
768	// the stream will failed as a result.
769	cancel()
770}
771
772// This func is to be used to test client side counting of failed streams.
773func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
774	stream, err := tc.FullDuplexCall(context.Background())
775	if err != nil {
776		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
777	}
778
779	const smallSize = 1
780	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
781	if err != nil {
782		t.Fatal(err)
783	}
784
785	sreq := &testpb.StreamingOutputCallRequest{
786		ResponseType: testpb.PayloadType_COMPRESSABLE,
787		ResponseParameters: []*testpb.ResponseParameters{
788			{Size: smallSize},
789		},
790		Payload: smallPayload,
791	}
792
793	if err := stream.Send(sreq); err != nil {
794		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
795	}
796	if _, err := stream.Recv(); err != nil {
797		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
798	}
799
800	rcw := l.getLastConn()
801
802	if rcw != nil {
803		rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
804	}
805	if _, err := stream.Recv(); err == nil {
806		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
807	}
808}
809
810// this func is to be used to test client side counting of failed streams.
811func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
812	// This call is just to keep the transport from shutting down (socket will be deleted
813	// in this case, and we will not be able to get metrics).
814	s, err := tc.FullDuplexCall(context.Background())
815	if err != nil {
816		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
817	}
818	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
819		{
820			Size: 1,
821		},
822	}}); err != nil {
823		t.Fatalf("s.Send() failed with error: %v", err)
824	}
825	if _, err := s.Recv(); err != nil {
826		t.Fatalf("s.Recv() failed with error: %v", err)
827	}
828
829	s, err = tc.FullDuplexCall(context.Background())
830	if err != nil {
831		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
832	}
833	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
834		{
835			Size: 1,
836		},
837	}}); err != nil {
838		t.Fatalf("s.Send() failed with error: %v", err)
839	}
840	if _, err := s.Recv(); err != nil {
841		t.Fatalf("s.Recv() failed with error: %v", err)
842	}
843
844	rcw := l.getLastConn()
845	if rcw != nil {
846		rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
847	}
848	if _, err := s.Recv(); err == nil {
849		t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
850	}
851}
852
853func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
854	ctx, cancel := context.WithCancel(context.Background())
855	_, err := tc.FullDuplexCall(ctx)
856	if err != nil {
857		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
858	}
859	// Allow for at least 2 keepalives (1s per ping interval)
860	time.Sleep(4 * time.Second)
861	cancel()
862}
863
864func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
865	czCleanup := channelz.NewChannelzStorage()
866	defer czCleanupWrapper(czCleanup, t)
867	e := tcpClearRREnv
868	te := newTest(t, e)
869	te.maxServerReceiveMsgSize = newInt(20)
870	te.maxClientReceiveMsgSize = newInt(20)
871	rcw := te.startServerWithConnControl(&testServer{security: e.security})
872	defer te.tearDown()
873	cc := te.clientConn()
874	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
875
876	doSuccessfulUnaryCall(tc, t)
877	var scID, skID int64
878	if err := verifyResultWithDelay(func() (bool, error) {
879		tchan, _ := channelz.GetTopChannels(0, 0)
880		if len(tchan) != 1 {
881			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
882		}
883		if len(tchan[0].SubChans) != 1 {
884			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
885		}
886
887		for scID = range tchan[0].SubChans {
888			break
889		}
890		sc := channelz.GetSubChannel(scID)
891		if sc == nil {
892			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
893		}
894		if len(sc.Sockets) != 1 {
895			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
896		}
897		for skID = range sc.Sockets {
898			break
899		}
900		skt := channelz.GetSocket(skID)
901		sktData := skt.SocketData
902		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
903			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
904		}
905		return true, nil
906	}); err != nil {
907		t.Fatal(err)
908	}
909
910	doServerSideFailedUnaryCall(tc, t)
911	if err := verifyResultWithDelay(func() (bool, error) {
912		skt := channelz.GetSocket(skID)
913		sktData := skt.SocketData
914		if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 {
915			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
916		}
917		return true, nil
918	}); err != nil {
919		t.Fatal(err)
920	}
921
922	doClientSideInitiatedFailedStream(tc, t)
923	if err := verifyResultWithDelay(func() (bool, error) {
924		skt := channelz.GetSocket(skID)
925		sktData := skt.SocketData
926		if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 {
927			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
928		}
929		return true, nil
930	}); err != nil {
931		t.Fatal(err)
932	}
933
934	doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
935	if err := verifyResultWithDelay(func() (bool, error) {
936		skt := channelz.GetSocket(skID)
937		sktData := skt.SocketData
938		if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 {
939			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
940		}
941		return true, nil
942	}); err != nil {
943		t.Fatal(err)
944	}
945
946	doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw)
947	if err := verifyResultWithDelay(func() (bool, error) {
948		skt := channelz.GetSocket(skID)
949		sktData := skt.SocketData
950		if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 {
951			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
952		}
953		return true, nil
954	}); err != nil {
955		t.Fatal(err)
956	}
957}
958
959// This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
960// TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
961// server sending RST_STREAM to client due to client side flow control violation.
962// It is separated from other cases due to setup incompatibly, i.e. max receive
963// size violation will mask flow control violation.
964func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
965	czCleanup := channelz.NewChannelzStorage()
966	defer czCleanupWrapper(czCleanup, t)
967	e := tcpClearRREnv
968	te := newTest(t, e)
969	te.serverInitialWindowSize = 65536
970	// Avoid overflowing connection level flow control window, which will lead to
971	// transport being closed.
972	te.serverInitialConnWindowSize = 65536 * 2
973	ts := &stubserver.StubServer{FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
974		stream.Send(&testpb.StreamingOutputCallResponse{})
975		<-stream.Context().Done()
976		return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
977	}}
978	te.startServer(ts)
979	defer te.tearDown()
980	cc, dw := te.clientConnWithConnControl()
981	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
982
983	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
984	stream, err := tc.FullDuplexCall(ctx)
985	if err != nil {
986		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
987	}
988	if _, err := stream.Recv(); err != nil {
989		t.Fatalf("stream.Recv() = %v, want nil", err)
990	}
991	go func() {
992		payload := make([]byte, 16384)
993		for i := 0; i < 6; i++ {
994			dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload)
995		}
996	}()
997	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
998		t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted)
999	}
1000	cancel()
1001
1002	if err := verifyResultWithDelay(func() (bool, error) {
1003		tchan, _ := channelz.GetTopChannels(0, 0)
1004		if len(tchan) != 1 {
1005			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1006		}
1007		if len(tchan[0].SubChans) != 1 {
1008			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1009		}
1010		var id int64
1011		for id = range tchan[0].SubChans {
1012			break
1013		}
1014		sc := channelz.GetSubChannel(id)
1015		if sc == nil {
1016			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1017		}
1018		if len(sc.Sockets) != 1 {
1019			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1020		}
1021		for id = range sc.Sockets {
1022			break
1023		}
1024		skt := channelz.GetSocket(id)
1025		sktData := skt.SocketData
1026		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
1027			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
1028		}
1029		ss, _ := channelz.GetServers(0, 0)
1030		if len(ss) != 1 {
1031			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1032		}
1033
1034		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1035		if len(ns) != 1 {
1036			return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
1037		}
1038		sktData = ns[0].SocketData
1039		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
1040			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
1041		}
1042		return true, nil
1043	}); err != nil {
1044		t.Fatal(err)
1045	}
1046}
1047
1048func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
1049	czCleanup := channelz.NewChannelzStorage()
1050	defer czCleanupWrapper(czCleanup, t)
1051	e := tcpClearRREnv
1052	te := newTest(t, e)
1053	// disable BDP
1054	te.serverInitialWindowSize = 65536
1055	te.serverInitialConnWindowSize = 65536
1056	te.clientInitialWindowSize = 65536
1057	te.clientInitialConnWindowSize = 65536
1058	te.startServer(&testServer{security: e.security})
1059	defer te.tearDown()
1060	cc := te.clientConn()
1061	tc := testpb.NewTestServiceClient(cc)
1062
1063	for i := 0; i < 10; i++ {
1064		doSuccessfulUnaryCall(tc, t)
1065	}
1066
1067	var cliSktID, svrSktID int64
1068	if err := verifyResultWithDelay(func() (bool, error) {
1069		tchan, _ := channelz.GetTopChannels(0, 0)
1070		if len(tchan) != 1 {
1071			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1072		}
1073		if len(tchan[0].SubChans) != 1 {
1074			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1075		}
1076		var id int64
1077		for id = range tchan[0].SubChans {
1078			break
1079		}
1080		sc := channelz.GetSubChannel(id)
1081		if sc == nil {
1082			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1083		}
1084		if len(sc.Sockets) != 1 {
1085			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1086		}
1087		for id = range sc.Sockets {
1088			break
1089		}
1090		skt := channelz.GetSocket(id)
1091		sktData := skt.SocketData
1092		// 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1093		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1094			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1095		}
1096		ss, _ := channelz.GetServers(0, 0)
1097		if len(ss) != 1 {
1098			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1099		}
1100		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1101		sktData = ns[0].SocketData
1102		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1103			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1104		}
1105		cliSktID, svrSktID = id, ss[0].ID
1106		return true, nil
1107	}); err != nil {
1108		t.Fatal(err)
1109	}
1110
1111	doStreamingInputCallWithLargePayload(tc, t)
1112
1113	if err := verifyResultWithDelay(func() (bool, error) {
1114		skt := channelz.GetSocket(cliSktID)
1115		sktData := skt.SocketData
1116		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1117		// Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
1118		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
1119			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1120		}
1121		ss, _ := channelz.GetServers(0, 0)
1122		if len(ss) != 1 {
1123			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1124		}
1125		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1126		sktData = ns[0].SocketData
1127		if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
1128			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1129		}
1130		return true, nil
1131	}); err != nil {
1132		t.Fatal(err)
1133	}
1134
1135	// triggers transport flow control window update on server side, since unacked
1136	// bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
1137	doStreamingInputCallWithLargePayload(tc, t)
1138	if err := verifyResultWithDelay(func() (bool, error) {
1139		skt := channelz.GetSocket(cliSktID)
1140		sktData := skt.SocketData
1141		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1142		// Remote: 65536
1143		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
1144			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1145		}
1146		ss, _ := channelz.GetServers(0, 0)
1147		if len(ss) != 1 {
1148			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1149		}
1150		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1151		sktData = ns[0].SocketData
1152		if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
1153			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1154		}
1155		return true, nil
1156	}); err != nil {
1157		t.Fatal(err)
1158	}
1159}
1160
1161func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
1162	czCleanup := channelz.NewChannelzStorage()
1163	defer czCleanupWrapper(czCleanup, t)
1164	defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
1165	internal.KeepaliveMinPingTime = time.Second
1166	e := tcpClearRREnv
1167	te := newTest(t, e)
1168	te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
1169		keepalive.ClientParameters{
1170			Time:                time.Second,
1171			Timeout:             500 * time.Millisecond,
1172			PermitWithoutStream: true,
1173		}))
1174	te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
1175		keepalive.EnforcementPolicy{
1176			MinTime:             500 * time.Millisecond,
1177			PermitWithoutStream: true,
1178		}))
1179	te.startServer(&testServer{security: e.security})
1180	te.clientConn() // Dial the server
1181	defer te.tearDown()
1182	if err := verifyResultWithDelay(func() (bool, error) {
1183		tchan, _ := channelz.GetTopChannels(0, 0)
1184		if len(tchan) != 1 {
1185			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1186		}
1187		if len(tchan[0].SubChans) != 1 {
1188			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1189		}
1190		var id int64
1191		for id = range tchan[0].SubChans {
1192			break
1193		}
1194		sc := channelz.GetSubChannel(id)
1195		if sc == nil {
1196			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1197		}
1198		if len(sc.Sockets) != 1 {
1199			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1200		}
1201		for id = range sc.Sockets {
1202			break
1203		}
1204		skt := channelz.GetSocket(id)
1205		if skt.SocketData.KeepAlivesSent != 2 {
1206			return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
1207		}
1208		return true, nil
1209	}); err != nil {
1210		t.Fatal(err)
1211	}
1212}
1213
1214func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
1215	czCleanup := channelz.NewChannelzStorage()
1216	defer czCleanupWrapper(czCleanup, t)
1217	e := tcpClearRREnv
1218	te := newTest(t, e)
1219	te.maxServerReceiveMsgSize = newInt(20)
1220	te.maxClientReceiveMsgSize = newInt(20)
1221	te.startServer(&testServer{security: e.security})
1222	defer te.tearDown()
1223	cc, _ := te.clientConnWithConnControl()
1224	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
1225
1226	var svrID int64
1227	if err := verifyResultWithDelay(func() (bool, error) {
1228		ss, _ := channelz.GetServers(0, 0)
1229		if len(ss) != 1 {
1230			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1231		}
1232		svrID = ss[0].ID
1233		return true, nil
1234	}); err != nil {
1235		t.Fatal(err)
1236	}
1237
1238	doSuccessfulUnaryCall(tc, t)
1239	if err := verifyResultWithDelay(func() (bool, error) {
1240		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1241		sktData := ns[0].SocketData
1242		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
1243			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1244		}
1245		return true, nil
1246	}); err != nil {
1247		t.Fatal(err)
1248	}
1249
1250	doServerSideFailedUnaryCall(tc, t)
1251	if err := verifyResultWithDelay(func() (bool, error) {
1252		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1253		sktData := ns[0].SocketData
1254		if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
1255			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1256		}
1257		return true, nil
1258	}); err != nil {
1259		t.Fatal(err)
1260	}
1261
1262	doClientSideInitiatedFailedStream(tc, t)
1263	if err := verifyResultWithDelay(func() (bool, error) {
1264		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1265		sktData := ns[0].SocketData
1266		if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
1267			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1268		}
1269		return true, nil
1270	}); err != nil {
1271		t.Fatal(err)
1272	}
1273}
1274
1275func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
1276	czCleanup := channelz.NewChannelzStorage()
1277	defer czCleanupWrapper(czCleanup, t)
1278	e := tcpClearRREnv
1279	te := newTest(t, e)
1280	// We setup the server keepalive parameters to send one keepalive every
1281	// second, and verify that the actual number of keepalives is very close to
1282	// the number of seconds elapsed in the test.  We had a bug wherein the
1283	// server was sending one keepalive every [Time+Timeout] instead of every
1284	// [Time] period, and since Timeout is configured to a low value here, we
1285	// should be able to verify that the fix works with the above mentioned
1286	// logic.
1287	kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{
1288		Time:    time.Second,
1289		Timeout: 100 * time.Millisecond,
1290	})
1291	te.customServerOptions = append(te.customServerOptions, kpOption)
1292	te.startServer(&testServer{security: e.security})
1293	defer te.tearDown()
1294	cc := te.clientConn()
1295	tc := testpb.NewTestServiceClient(cc)
1296	start := time.Now()
1297	doIdleCallToInvokeKeepAlive(tc, t)
1298
1299	if err := verifyResultWithDelay(func() (bool, error) {
1300		ss, _ := channelz.GetServers(0, 0)
1301		if len(ss) != 1 {
1302			return false, fmt.Errorf("there should be one server, not %d", len(ss))
1303		}
1304		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1305		if len(ns) != 1 {
1306			return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
1307		}
1308		wantKeepalivesCount := int64(time.Since(start).Seconds()) - 1
1309		if gotKeepalivesCount := ns[0].SocketData.KeepAlivesSent; gotKeepalivesCount != wantKeepalivesCount {
1310			return false, fmt.Errorf("got keepalivesCount: %v, want keepalivesCount: %v", gotKeepalivesCount, wantKeepalivesCount)
1311		}
1312		return true, nil
1313	}); err != nil {
1314		t.Fatal(err)
1315	}
1316}
1317
1318var cipherSuites = []string{
1319	"TLS_RSA_WITH_RC4_128_SHA",
1320	"TLS_RSA_WITH_3DES_EDE_CBC_SHA",
1321	"TLS_RSA_WITH_AES_128_CBC_SHA",
1322	"TLS_RSA_WITH_AES_256_CBC_SHA",
1323	"TLS_RSA_WITH_AES_128_GCM_SHA256",
1324	"TLS_RSA_WITH_AES_256_GCM_SHA384",
1325	"TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
1326	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
1327	"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
1328	"TLS_ECDHE_RSA_WITH_RC4_128_SHA",
1329	"TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
1330	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
1331	"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
1332	"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
1333	"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
1334	"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
1335	"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
1336	"TLS_FALLBACK_SCSV",
1337	"TLS_RSA_WITH_AES_128_CBC_SHA256",
1338	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
1339	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
1340	"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
1341	"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
1342	"TLS_AES_128_GCM_SHA256",
1343	"TLS_AES_256_GCM_SHA384",
1344	"TLS_CHACHA20_POLY1305_SHA256",
1345}
1346
1347func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
1348	czCleanup := channelz.NewChannelzStorage()
1349	defer czCleanupWrapper(czCleanup, t)
1350	e := tcpTLSRREnv
1351	te := newTest(t, e)
1352	te.startServer(&testServer{security: e.security})
1353	defer te.tearDown()
1354	te.clientConn()
1355	if err := verifyResultWithDelay(func() (bool, error) {
1356		tchan, _ := channelz.GetTopChannels(0, 0)
1357		if len(tchan) != 1 {
1358			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1359		}
1360		if len(tchan[0].SubChans) != 1 {
1361			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1362		}
1363		var id int64
1364		for id = range tchan[0].SubChans {
1365			break
1366		}
1367		sc := channelz.GetSubChannel(id)
1368		if sc == nil {
1369			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1370		}
1371		if len(sc.Sockets) != 1 {
1372			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1373		}
1374		for id = range sc.Sockets {
1375			break
1376		}
1377		skt := channelz.GetSocket(id)
1378		cert, _ := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
1379		securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue)
1380		if !ok {
1381			return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
1382		}
1383		if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
1384			return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
1385		}
1386		for _, v := range cipherSuites {
1387			if v == securityVal.StandardName {
1388				return true, nil
1389			}
1390		}
1391		return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites)
1392	}); err != nil {
1393		t.Fatal(err)
1394	}
1395}
1396
1397func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
1398	czCleanup := channelz.NewChannelzStorage()
1399	defer czCleanupWrapper(czCleanup, t)
1400	e := tcpClearRREnv
1401	// avoid calling API to set balancer type, which will void service config's change of balancer.
1402	e.balancer = ""
1403	te := newTest(t, e)
1404	r := manual.NewBuilderWithScheme("whatever")
1405	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
1406	r.InitialState(resolver.State{Addresses: resolvedAddrs})
1407	te.resolverScheme = r.Scheme()
1408	te.clientConn(grpc.WithResolvers(r))
1409	defer te.tearDown()
1410	var nestedConn int64
1411	if err := verifyResultWithDelay(func() (bool, error) {
1412		tcs, _ := channelz.GetTopChannels(0, 0)
1413		if len(tcs) != 1 {
1414			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1415		}
1416		if len(tcs[0].NestedChans) != 1 {
1417			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1418		}
1419		for k := range tcs[0].NestedChans {
1420			nestedConn = k
1421		}
1422		for _, e := range tcs[0].Trace.Events {
1423			if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
1424				return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
1425			}
1426		}
1427		ncm := channelz.GetChannel(nestedConn)
1428		if ncm.Trace == nil {
1429			return false, fmt.Errorf("trace for nested channel should not be empty")
1430		}
1431		if len(ncm.Trace.Events) == 0 {
1432			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1433		}
1434		if ncm.Trace.Events[0].Desc != "Channel Created" {
1435			return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
1436		}
1437		return true, nil
1438	}); err != nil {
1439		t.Fatal(err)
1440	}
1441
1442	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1443
1444	// wait for the shutdown of grpclb balancer
1445	if err := verifyResultWithDelay(func() (bool, error) {
1446		tcs, _ := channelz.GetTopChannels(0, 0)
1447		if len(tcs) != 1 {
1448			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1449		}
1450		if len(tcs[0].NestedChans) != 0 {
1451			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1452		}
1453		ncm := channelz.GetChannel(nestedConn)
1454		if ncm == nil {
1455			return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
1456		}
1457		if ncm.Trace == nil {
1458			return false, fmt.Errorf("trace for nested channel should not be empty")
1459		}
1460		if len(ncm.Trace.Events) == 0 {
1461			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1462		}
1463		if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
1464			return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
1465		}
1466		return true, nil
1467	}); err != nil {
1468		t.Fatal(err)
1469	}
1470}
1471
1472func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
1473	czCleanup := channelz.NewChannelzStorage()
1474	defer czCleanupWrapper(czCleanup, t)
1475	e := tcpClearRREnv
1476	te := newTest(t, e)
1477	te.startServer(&testServer{security: e.security})
1478	r := manual.NewBuilderWithScheme("whatever")
1479	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1480	te.resolverScheme = r.Scheme()
1481	te.clientConn(grpc.WithResolvers(r))
1482	defer te.tearDown()
1483	var subConn int64
1484	// Here, we just wait for all sockets to be up. In the future, if we implement
1485	// IDLE, we may need to make several rpc calls to create the sockets.
1486	if err := verifyResultWithDelay(func() (bool, error) {
1487		tcs, _ := channelz.GetTopChannels(0, 0)
1488		if len(tcs) != 1 {
1489			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1490		}
1491		if len(tcs[0].SubChans) != 1 {
1492			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1493		}
1494		for k := range tcs[0].SubChans {
1495			subConn = k
1496		}
1497		for _, e := range tcs[0].Trace.Events {
1498			if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
1499				return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
1500			}
1501		}
1502		scm := channelz.GetSubChannel(subConn)
1503		if scm == nil {
1504			return false, fmt.Errorf("subChannel does not exist")
1505		}
1506		if scm.Trace == nil {
1507			return false, fmt.Errorf("trace for subChannel should not be empty")
1508		}
1509		if len(scm.Trace.Events) == 0 {
1510			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1511		}
1512		if scm.Trace.Events[0].Desc != "Subchannel Created" {
1513			return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
1514		}
1515		return true, nil
1516	}); err != nil {
1517		t.Fatal(err)
1518	}
1519
1520	// Wait for ready
1521	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1522	defer cancel()
1523	for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
1524		if !te.cc.WaitForStateChange(ctx, src) {
1525			t.Fatalf("timed out waiting for state change.  got %v; want %v", src, connectivity.Ready)
1526		}
1527	}
1528	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1529	// Wait for not-ready.
1530	for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
1531		if !te.cc.WaitForStateChange(ctx, src) {
1532			t.Fatalf("timed out waiting for state change.  got %v; want !%v", src, connectivity.Ready)
1533		}
1534	}
1535
1536	if err := verifyResultWithDelay(func() (bool, error) {
1537		tcs, _ := channelz.GetTopChannels(0, 0)
1538		if len(tcs) != 1 {
1539			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1540		}
1541		if len(tcs[0].SubChans) != 1 {
1542			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1543		}
1544		scm := channelz.GetSubChannel(subConn)
1545		if scm == nil {
1546			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1547		}
1548		if scm.Trace == nil {
1549			return false, fmt.Errorf("trace for SubChannel should not be empty")
1550		}
1551		if len(scm.Trace.Events) == 0 {
1552			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1553		}
1554		if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
1555			return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
1556		}
1557
1558		return true, nil
1559	}); err != nil {
1560		t.Fatal(err)
1561	}
1562}
1563
1564func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
1565	czCleanup := channelz.NewChannelzStorage()
1566	defer czCleanupWrapper(czCleanup, t)
1567	e := tcpClearRREnv
1568	e.balancer = ""
1569	te := newTest(t, e)
1570	te.startServer(&testServer{security: e.security})
1571	r := manual.NewBuilderWithScheme("whatever")
1572	addrs := []resolver.Address{{Addr: te.srvAddr}}
1573	r.InitialState(resolver.State{Addresses: addrs})
1574	te.resolverScheme = r.Scheme()
1575	te.clientConn(grpc.WithResolvers(r))
1576	defer te.tearDown()
1577	var cid int64
1578	// Here, we just wait for all sockets to be up. In the future, if we implement
1579	// IDLE, we may need to make several rpc calls to create the sockets.
1580	if err := verifyResultWithDelay(func() (bool, error) {
1581		tcs, _ := channelz.GetTopChannels(0, 0)
1582		if len(tcs) != 1 {
1583			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1584		}
1585		cid = tcs[0].ID
1586		for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
1587			if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") {
1588				break
1589			}
1590			if i == 0 {
1591				return false, fmt.Errorf("events do not contain expected address resolution from empty address state.  Got: %+v", tcs[0].Trace.Events)
1592			}
1593		}
1594		return true, nil
1595	}); err != nil {
1596		t.Fatal(err)
1597	}
1598	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1599
1600	if err := verifyResultWithDelay(func() (bool, error) {
1601		cm := channelz.GetChannel(cid)
1602		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1603			if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
1604				break
1605			}
1606			if i == 0 {
1607				return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
1608			}
1609		}
1610		return true, nil
1611	}); err != nil {
1612		t.Fatal(err)
1613	}
1614
1615	newSC := parseCfg(r, `{
1616    "methodConfig": [
1617        {
1618            "name": [
1619                {
1620                    "service": "grpc.testing.TestService",
1621                    "method": "EmptyCall"
1622                }
1623            ],
1624            "waitForReady": false,
1625            "timeout": ".001s"
1626        }
1627    ]
1628}`)
1629	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
1630
1631	if err := verifyResultWithDelay(func() (bool, error) {
1632		cm := channelz.GetChannel(cid)
1633
1634		var es []string
1635		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1636			if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") {
1637				break
1638			}
1639			es = append(es, cm.Trace.Events[i].Desc)
1640			if i == 0 {
1641				return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n"))
1642			}
1643		}
1644		return true, nil
1645	}); err != nil {
1646		t.Fatal(err)
1647	}
1648
1649	r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
1650
1651	if err := verifyResultWithDelay(func() (bool, error) {
1652		cm := channelz.GetChannel(cid)
1653		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1654			if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") {
1655				break
1656			}
1657			if i == 0 {
1658				return false, fmt.Errorf("events do not contain expected address resolution of empty address")
1659			}
1660		}
1661		return true, nil
1662	}); err != nil {
1663		t.Fatal(err)
1664	}
1665}
1666
1667func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
1668	czCleanup := channelz.NewChannelzStorage()
1669	defer czCleanupWrapper(czCleanup, t)
1670	e := tcpClearRREnv
1671	e.balancer = ""
1672	te := newTest(t, e)
1673	te.startServers(&testServer{security: e.security}, 3)
1674	r := manual.NewBuilderWithScheme("whatever")
1675	var svrAddrs []resolver.Address
1676	for _, a := range te.srvAddrs {
1677		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
1678	}
1679	r.InitialState(resolver.State{Addresses: svrAddrs})
1680	te.resolverScheme = r.Scheme()
1681	cc := te.clientConn(grpc.WithResolvers(r))
1682	defer te.tearDown()
1683	tc := testpb.NewTestServiceClient(cc)
1684	// make sure the connection is up
1685	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1686	defer cancel()
1687	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1688		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1689	}
1690	te.srvs[0].Stop()
1691	te.srvs[1].Stop()
1692	// Here, we just wait for all sockets to be up. In the future, if we implement
1693	// IDLE, we may need to make several rpc calls to create the sockets.
1694	if err := verifyResultWithDelay(func() (bool, error) {
1695		tcs, _ := channelz.GetTopChannels(0, 0)
1696		if len(tcs) != 1 {
1697			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1698		}
1699		if len(tcs[0].SubChans) != 1 {
1700			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1701		}
1702		var subConn int64
1703		for k := range tcs[0].SubChans {
1704			subConn = k
1705		}
1706		scm := channelz.GetSubChannel(subConn)
1707		if scm.Trace == nil {
1708			return false, fmt.Errorf("trace for SubChannel should not be empty")
1709		}
1710		if len(scm.Trace.Events) == 0 {
1711			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1712		}
1713		for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
1714			if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
1715				break
1716			}
1717			if i == 0 {
1718				return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
1719			}
1720		}
1721		return true, nil
1722	}); err != nil {
1723		t.Fatal(err)
1724	}
1725}
1726
1727func (s) TestCZSubChannelConnectivityState(t *testing.T) {
1728	czCleanup := channelz.NewChannelzStorage()
1729	defer czCleanupWrapper(czCleanup, t)
1730	e := tcpClearRREnv
1731	te := newTest(t, e)
1732	te.startServer(&testServer{security: e.security})
1733	r := manual.NewBuilderWithScheme("whatever")
1734	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1735	te.resolverScheme = r.Scheme()
1736	cc := te.clientConn(grpc.WithResolvers(r))
1737	defer te.tearDown()
1738	tc := testpb.NewTestServiceClient(cc)
1739	// make sure the connection is up
1740	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1741	defer cancel()
1742	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1743		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1744	}
1745	var subConn int64
1746	te.srv.Stop()
1747
1748	if err := verifyResultWithDelay(func() (bool, error) {
1749		// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
1750		// to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
1751		if subConn == 0 {
1752			tcs, _ := channelz.GetTopChannels(0, 0)
1753			if len(tcs) != 1 {
1754				return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1755			}
1756			if len(tcs[0].SubChans) != 1 {
1757				return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1758			}
1759			for k := range tcs[0].SubChans {
1760				// get the SubChannel id for further trace inquiry.
1761				subConn = k
1762			}
1763		}
1764		scm := channelz.GetSubChannel(subConn)
1765		if scm == nil {
1766			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1767		}
1768		if scm.Trace == nil {
1769			return false, fmt.Errorf("trace for SubChannel should not be empty")
1770		}
1771		if len(scm.Trace.Events) == 0 {
1772			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1773		}
1774		var ready, connecting, transient, shutdown int
1775		for _, e := range scm.Trace.Events {
1776			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
1777				transient++
1778			}
1779		}
1780		// Make sure the SubChannel has already seen transient failure before shutting it down through
1781		// r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}).
1782		if transient == 0 {
1783			return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
1784		}
1785		transient = 0
1786		r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1787		for _, e := range scm.Trace.Events {
1788			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
1789				ready++
1790			}
1791			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
1792				connecting++
1793			}
1794			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
1795				transient++
1796			}
1797			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
1798				shutdown++
1799			}
1800		}
1801		// example:
1802		// Subchannel Created
1803		// Subchannel's connectivity state changed to CONNECTING
1804		// Subchannel picked a new address: "localhost:36011"
1805		// Subchannel's connectivity state changed to READY
1806		// Subchannel's connectivity state changed to TRANSIENT_FAILURE
1807		// Subchannel's connectivity state changed to CONNECTING
1808		// Subchannel picked a new address: "localhost:36011"
1809		// Subchannel's connectivity state changed to SHUTDOWN
1810		// Subchannel Deleted
1811		if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
1812			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
1813		}
1814
1815		return true, nil
1816	}); err != nil {
1817		t.Fatal(err)
1818	}
1819}
1820
1821func (s) TestCZChannelConnectivityState(t *testing.T) {
1822	czCleanup := channelz.NewChannelzStorage()
1823	defer czCleanupWrapper(czCleanup, t)
1824	e := tcpClearRREnv
1825	te := newTest(t, e)
1826	te.startServer(&testServer{security: e.security})
1827	r := manual.NewBuilderWithScheme("whatever")
1828	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1829	te.resolverScheme = r.Scheme()
1830	cc := te.clientConn(grpc.WithResolvers(r))
1831	defer te.tearDown()
1832	tc := testpb.NewTestServiceClient(cc)
1833	// make sure the connection is up
1834	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1835	defer cancel()
1836	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1837		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1838	}
1839	te.srv.Stop()
1840	if err := verifyResultWithDelay(func() (bool, error) {
1841		tcs, _ := channelz.GetTopChannels(0, 0)
1842		if len(tcs) != 1 {
1843			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1844		}
1845
1846		var ready, connecting, transient int
1847		for _, e := range tcs[0].Trace.Events {
1848			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
1849				ready++
1850			}
1851			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
1852				connecting++
1853			}
1854			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
1855				transient++
1856			}
1857		}
1858
1859		// example:
1860		// Channel Created
1861		// Adressses resolved (from empty address state): "localhost:40467"
1862		// SubChannel (id: 4[]) Created
1863		// Channel's connectivity state changed to CONNECTING
1864		// Channel's connectivity state changed to READY
1865		// Channel's connectivity state changed to TRANSIENT_FAILURE
1866		// Channel's connectivity state changed to CONNECTING
1867		// Channel's connectivity state changed to TRANSIENT_FAILURE
1868		if ready != 1 || connecting < 1 || transient < 1 {
1869			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
1870		}
1871		return true, nil
1872	}); err != nil {
1873		t.Fatal(err)
1874	}
1875}
1876
1877func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
1878	czCleanup := channelz.NewChannelzStorage()
1879	defer czCleanupWrapper(czCleanup, t)
1880	e := tcpClearRREnv
1881	// avoid newTest using WithBalancerName, which would override service
1882	// config's change of balancer below.
1883	e.balancer = ""
1884	te := newTest(t, e)
1885	channelz.SetMaxTraceEntry(1)
1886	defer channelz.ResetMaxTraceEntryToDefault()
1887	r := manual.NewBuilderWithScheme("whatever")
1888	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
1889	r.InitialState(resolver.State{Addresses: resolvedAddrs})
1890	te.resolverScheme = r.Scheme()
1891	te.clientConn(grpc.WithResolvers(r))
1892	defer te.tearDown()
1893	var nestedConn int64
1894	if err := verifyResultWithDelay(func() (bool, error) {
1895		tcs, _ := channelz.GetTopChannels(0, 0)
1896		if len(tcs) != 1 {
1897			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1898		}
1899		if len(tcs[0].NestedChans) != 1 {
1900			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1901		}
1902		for k := range tcs[0].NestedChans {
1903			nestedConn = k
1904		}
1905		return true, nil
1906	}); err != nil {
1907		t.Fatal(err)
1908	}
1909
1910	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1911
1912	// wait for the shutdown of grpclb balancer
1913	if err := verifyResultWithDelay(func() (bool, error) {
1914		tcs, _ := channelz.GetTopChannels(0, 0)
1915		if len(tcs) != 1 {
1916			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1917		}
1918		if len(tcs[0].NestedChans) != 0 {
1919			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1920		}
1921		return true, nil
1922	}); err != nil {
1923		t.Fatal(err)
1924	}
1925
1926	// If nested channel deletion is last trace event before the next validation, it will fail, as the top channel will hold a reference to it.
1927	// This line forces a trace event on the top channel in that case.
1928	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1929
1930	// verify that the nested channel no longer exist due to trace referencing it got overwritten.
1931	if err := verifyResultWithDelay(func() (bool, error) {
1932		cm := channelz.GetChannel(nestedConn)
1933		if cm != nil {
1934			return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
1935		}
1936		return true, nil
1937	}); err != nil {
1938		t.Fatal(err)
1939	}
1940}
1941
1942func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
1943	czCleanup := channelz.NewChannelzStorage()
1944	defer czCleanupWrapper(czCleanup, t)
1945	e := tcpClearRREnv
1946	te := newTest(t, e)
1947	channelz.SetMaxTraceEntry(1)
1948	defer channelz.ResetMaxTraceEntryToDefault()
1949	te.startServer(&testServer{security: e.security})
1950	r := manual.NewBuilderWithScheme("whatever")
1951	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1952	te.resolverScheme = r.Scheme()
1953	te.clientConn(grpc.WithResolvers(r))
1954	defer te.tearDown()
1955	var subConn int64
1956	// Here, we just wait for all sockets to be up. In the future, if we implement
1957	// IDLE, we may need to make several rpc calls to create the sockets.
1958	if err := verifyResultWithDelay(func() (bool, error) {
1959		tcs, _ := channelz.GetTopChannels(0, 0)
1960		if len(tcs) != 1 {
1961			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1962		}
1963		if len(tcs[0].SubChans) != 1 {
1964			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1965		}
1966		for k := range tcs[0].SubChans {
1967			subConn = k
1968		}
1969		return true, nil
1970	}); err != nil {
1971		t.Fatal(err)
1972	}
1973
1974	// Wait for ready
1975	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1976	defer cancel()
1977	for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
1978		if !te.cc.WaitForStateChange(ctx, src) {
1979			t.Fatalf("timed out waiting for state change.  got %v; want %v", src, connectivity.Ready)
1980		}
1981	}
1982	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1983	// Wait for not-ready.
1984	for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
1985		if !te.cc.WaitForStateChange(ctx, src) {
1986			t.Fatalf("timed out waiting for state change.  got %v; want !%v", src, connectivity.Ready)
1987		}
1988	}
1989
1990	// verify that the subchannel no longer exist due to trace referencing it got overwritten.
1991	if err := verifyResultWithDelay(func() (bool, error) {
1992		cm := channelz.GetChannel(subConn)
1993		if cm != nil {
1994			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
1995		}
1996		return true, nil
1997	}); err != nil {
1998		t.Fatal(err)
1999	}
2000}
2001
2002func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
2003	czCleanup := channelz.NewChannelzStorage()
2004	defer czCleanupWrapper(czCleanup, t)
2005	e := tcpClearRREnv
2006	te := newTest(t, e)
2007	te.startServer(&testServer{security: e.security})
2008	r := manual.NewBuilderWithScheme("whatever")
2009	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
2010	te.resolverScheme = r.Scheme()
2011	te.clientConn(grpc.WithResolvers(r))
2012	var subConn int64
2013	// Here, we just wait for all sockets to be up. In the future, if we implement
2014	// IDLE, we may need to make several rpc calls to create the sockets.
2015	if err := verifyResultWithDelay(func() (bool, error) {
2016		tcs, _ := channelz.GetTopChannels(0, 0)
2017		if len(tcs) != 1 {
2018			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2019		}
2020		if len(tcs[0].SubChans) != 1 {
2021			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
2022		}
2023		for k := range tcs[0].SubChans {
2024			subConn = k
2025		}
2026		return true, nil
2027	}); err != nil {
2028		t.Fatal(err)
2029	}
2030	te.tearDown()
2031	// verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
2032	if err := verifyResultWithDelay(func() (bool, error) {
2033		cm := channelz.GetChannel(subConn)
2034		if cm != nil {
2035			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
2036		}
2037		return true, nil
2038	}); err != nil {
2039		t.Fatal(err)
2040	}
2041}
2042