1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package test
20
21import (
22	"context"
23	"crypto/tls"
24	"fmt"
25	"net"
26	"reflect"
27	"strings"
28	"sync"
29	"testing"
30	"time"
31
32	"golang.org/x/net/http2"
33	"google.golang.org/grpc"
34	_ "google.golang.org/grpc/balancer/grpclb"
35	"google.golang.org/grpc/balancer/roundrobin"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/connectivity"
38	"google.golang.org/grpc/credentials"
39	"google.golang.org/grpc/internal"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/keepalive"
42	"google.golang.org/grpc/resolver"
43	"google.golang.org/grpc/resolver/manual"
44	"google.golang.org/grpc/status"
45	testpb "google.golang.org/grpc/test/grpc_testing"
46	"google.golang.org/grpc/testdata"
47)
48
49func czCleanupWrapper(cleanup func() error, t *testing.T) {
50	if err := cleanup(); err != nil {
51		t.Error(err)
52	}
53}
54
55func verifyResultWithDelay(f func() (bool, error)) error {
56	var ok bool
57	var err error
58	for i := 0; i < 1000; i++ {
59		if ok, err = f(); ok {
60			return nil
61		}
62		time.Sleep(10 * time.Millisecond)
63	}
64	return err
65}
66
67func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
68	testcases := []struct {
69		total  int
70		start  int64
71		max    int64
72		length int64
73		end    bool
74	}{
75		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
76		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
77		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
78		{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
79		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
80		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
81	}
82
83	for _, c := range testcases {
84		czCleanup := channelz.NewChannelzStorage()
85		defer czCleanupWrapper(czCleanup, t)
86		e := tcpClearRREnv
87		te := newTest(t, e)
88		te.startServers(&testServer{security: e.security}, c.total)
89
90		ss, end := channelz.GetServers(c.start, c.max)
91		if int64(len(ss)) != c.length || end != c.end {
92			t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
93		}
94		te.tearDown()
95		ss, end = channelz.GetServers(c.start, c.max)
96		if len(ss) != 0 || !end {
97			t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
98		}
99	}
100}
101
102func (s) TestCZGetServer(t *testing.T) {
103	czCleanup := channelz.NewChannelzStorage()
104	defer czCleanupWrapper(czCleanup, t)
105	e := tcpClearRREnv
106	te := newTest(t, e)
107	te.startServer(&testServer{security: e.security})
108	defer te.tearDown()
109
110	ss, _ := channelz.GetServers(0, 0)
111	if len(ss) != 1 {
112		t.Fatalf("there should only be one server, not %d", len(ss))
113	}
114
115	serverID := ss[0].ID
116	srv := channelz.GetServer(serverID)
117	if srv == nil {
118		t.Fatalf("server %d does not exist", serverID)
119	}
120	if srv.ID != serverID {
121		t.Fatalf("server want id %d, but got %d", serverID, srv.ID)
122	}
123
124	te.tearDown()
125
126	if err := verifyResultWithDelay(func() (bool, error) {
127		srv := channelz.GetServer(serverID)
128		if srv != nil {
129			return false, fmt.Errorf("server %d should not exist", serverID)
130		}
131
132		return true, nil
133	}); err != nil {
134		t.Fatal(err)
135	}
136}
137
138func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
139	testcases := []struct {
140		total  int
141		start  int64
142		max    int64
143		length int64
144		end    bool
145	}{
146		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
147		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
148		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
149		{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
150		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
151		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
152	}
153
154	for _, c := range testcases {
155		czCleanup := channelz.NewChannelzStorage()
156		defer czCleanupWrapper(czCleanup, t)
157		e := tcpClearRREnv
158		te := newTest(t, e)
159		var ccs []*grpc.ClientConn
160		for i := 0; i < c.total; i++ {
161			cc := te.clientConn()
162			te.cc = nil
163			// avoid making next dial blocking
164			te.srvAddr = ""
165			ccs = append(ccs, cc)
166		}
167		if err := verifyResultWithDelay(func() (bool, error) {
168			if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end {
169				return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
170			}
171			return true, nil
172		}); err != nil {
173			t.Fatal(err)
174		}
175
176		for _, cc := range ccs {
177			cc.Close()
178		}
179
180		if err := verifyResultWithDelay(func() (bool, error) {
181			if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
182				return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
183			}
184			return true, nil
185		}); err != nil {
186			t.Fatal(err)
187		}
188		te.tearDown()
189	}
190}
191
192func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) {
193	czCleanup := channelz.NewChannelzStorage()
194	defer czCleanupWrapper(czCleanup, t)
195	// Make dial fails (due to no transport security specified)
196	_, err := grpc.Dial("fake.addr")
197	if err == nil {
198		t.Fatal("expecting dial to fail")
199	}
200	if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end {
201		t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end)
202	}
203}
204
205func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
206	czCleanup := channelz.NewChannelzStorage()
207	defer czCleanupWrapper(czCleanup, t)
208	e := tcpClearRREnv
209	// avoid calling API to set balancer type, which will void service config's change of balancer.
210	e.balancer = ""
211	te := newTest(t, e)
212	r, cleanup := manual.GenerateAndRegisterManualResolver()
213	defer cleanup()
214	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
215	r.InitialState(resolver.State{Addresses: resolvedAddrs})
216	te.resolverScheme = r.Scheme()
217	te.clientConn()
218	defer te.tearDown()
219
220	if err := verifyResultWithDelay(func() (bool, error) {
221		tcs, _ := channelz.GetTopChannels(0, 0)
222		if len(tcs) != 1 {
223			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
224		}
225		if len(tcs[0].NestedChans) != 1 {
226			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
227		}
228		return true, nil
229	}); err != nil {
230		t.Fatal(err)
231	}
232
233	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
234
235	// wait for the shutdown of grpclb balancer
236	if err := verifyResultWithDelay(func() (bool, error) {
237		tcs, _ := channelz.GetTopChannels(0, 0)
238		if len(tcs) != 1 {
239			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
240		}
241		if len(tcs[0].NestedChans) != 0 {
242			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
243		}
244		return true, nil
245	}); err != nil {
246		t.Fatal(err)
247	}
248}
249
250func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
251	czCleanup := channelz.NewChannelzStorage()
252	defer czCleanupWrapper(czCleanup, t)
253	e := tcpClearRREnv
254	num := 3 // number of backends
255	te := newTest(t, e)
256	var svrAddrs []resolver.Address
257	te.startServers(&testServer{security: e.security}, num)
258	r, cleanup := manual.GenerateAndRegisterManualResolver()
259	defer cleanup()
260	for _, a := range te.srvAddrs {
261		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
262	}
263	r.InitialState(resolver.State{Addresses: svrAddrs})
264	te.resolverScheme = r.Scheme()
265	te.clientConn()
266	defer te.tearDown()
267	// Here, we just wait for all sockets to be up. In the future, if we implement
268	// IDLE, we may need to make several rpc calls to create the sockets.
269	if err := verifyResultWithDelay(func() (bool, error) {
270		tcs, _ := channelz.GetTopChannels(0, 0)
271		if len(tcs) != 1 {
272			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
273		}
274		if len(tcs[0].SubChans) != num {
275			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
276		}
277		count := 0
278		for k := range tcs[0].SubChans {
279			sc := channelz.GetSubChannel(k)
280			if sc == nil {
281				return false, fmt.Errorf("got <nil> subchannel")
282			}
283			count += len(sc.Sockets)
284		}
285		if count != num {
286			return false, fmt.Errorf("there should be %d sockets not %d", num, count)
287		}
288
289		return true, nil
290	}); err != nil {
291		t.Fatal(err)
292	}
293
294	r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
295
296	if err := verifyResultWithDelay(func() (bool, error) {
297		tcs, _ := channelz.GetTopChannels(0, 0)
298		if len(tcs) != 1 {
299			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
300		}
301		if len(tcs[0].SubChans) != num-1 {
302			return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
303		}
304		count := 0
305		for k := range tcs[0].SubChans {
306			sc := channelz.GetSubChannel(k)
307			if sc == nil {
308				return false, fmt.Errorf("got <nil> subchannel")
309			}
310			count += len(sc.Sockets)
311		}
312		if count != num-1 {
313			return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
314		}
315
316		return true, nil
317	}); err != nil {
318		t.Fatal(err)
319	}
320}
321
322func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
323	testcases := []struct {
324		total  int
325		start  int64
326		max    int64
327		length int64
328		end    bool
329	}{
330		{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
331		{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
332		{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
333		{total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true},
334		{total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true},
335		{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
336		{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
337	}
338
339	for _, c := range testcases {
340		czCleanup := channelz.NewChannelzStorage()
341		defer czCleanupWrapper(czCleanup, t)
342		e := tcpClearRREnv
343		te := newTest(t, e)
344		te.startServer(&testServer{security: e.security})
345		var ccs []*grpc.ClientConn
346		for i := 0; i < c.total; i++ {
347			cc := te.clientConn()
348			te.cc = nil
349			ccs = append(ccs, cc)
350		}
351
352		var svrID int64
353		if err := verifyResultWithDelay(func() (bool, error) {
354			ss, _ := channelz.GetServers(0, 0)
355			if len(ss) != 1 {
356				return false, fmt.Errorf("there should only be one server, not %d", len(ss))
357			}
358			if len(ss[0].ListenSockets) != 1 {
359				return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
360			}
361
362			startID := c.start
363			if startID != 0 {
364				ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total))
365				if int64(len(ns)) < c.start {
366					return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
367				}
368				startID = ns[c.start-1].ID + 1
369			}
370
371			ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
372			if int64(len(ns)) != c.length || end != c.end {
373				return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
374			}
375
376			svrID = ss[0].ID
377			return true, nil
378		}); err != nil {
379			t.Fatal(err)
380		}
381
382		for _, cc := range ccs {
383			cc.Close()
384		}
385
386		if err := verifyResultWithDelay(func() (bool, error) {
387			ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
388			if len(ns) != 0 {
389				return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
390			}
391			return true, nil
392		}); err != nil {
393			t.Fatal(err)
394		}
395		te.tearDown()
396	}
397}
398
399func (s) TestCZServerListenSocketDeletion(t *testing.T) {
400	czCleanup := channelz.NewChannelzStorage()
401	defer czCleanupWrapper(czCleanup, t)
402	s := grpc.NewServer()
403	lis, err := net.Listen("tcp", "localhost:0")
404	if err != nil {
405		t.Fatalf("failed to listen: %v", err)
406	}
407	go s.Serve(lis)
408	if err := verifyResultWithDelay(func() (bool, error) {
409		ss, _ := channelz.GetServers(0, 0)
410		if len(ss) != 1 {
411			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
412		}
413		if len(ss[0].ListenSockets) != 1 {
414			return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
415		}
416		return true, nil
417	}); err != nil {
418		t.Fatal(err)
419	}
420
421	lis.Close()
422	if err := verifyResultWithDelay(func() (bool, error) {
423		ss, _ := channelz.GetServers(0, 0)
424		if len(ss) != 1 {
425			return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
426		}
427		if len(ss[0].ListenSockets) != 0 {
428			return false, fmt.Errorf("there should only be %d server listen socket, not %d", 0, len(ss[0].ListenSockets))
429		}
430		return true, nil
431	}); err != nil {
432		t.Fatal(err)
433	}
434	s.Stop()
435}
436
437type dummyChannel struct{}
438
439func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
440	return &channelz.ChannelInternalMetric{}
441}
442
443type dummySocket struct{}
444
445func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
446	return &channelz.SocketInternalMetric{}
447}
448
449func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
450	//           +--+TopChan+---+
451	//           |              |
452	//           v              v
453	//    +-+SubChan1+--+   SubChan2
454	//    |             |
455	//    v             v
456	// Socket1       Socket2
457	czCleanup := channelz.NewChannelzStorage()
458	defer czCleanupWrapper(czCleanup, t)
459	topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
460	subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
461	subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
462	sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
463	sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
464
465	tcs, _ := channelz.GetTopChannels(0, 0)
466	if tcs == nil || len(tcs) != 1 {
467		t.Fatalf("There should be one TopChannel entry")
468	}
469	if len(tcs[0].SubChans) != 2 {
470		t.Fatalf("There should be two SubChannel entries")
471	}
472	sc := channelz.GetSubChannel(subChanID1)
473	if sc == nil || len(sc.Sockets) != 2 {
474		t.Fatalf("There should be two Socket entries")
475	}
476
477	channelz.RemoveEntry(topChanID)
478	tcs, _ = channelz.GetTopChannels(0, 0)
479	if tcs == nil || len(tcs) != 1 {
480		t.Fatalf("There should be one TopChannel entry")
481	}
482
483	channelz.RemoveEntry(subChanID1)
484	channelz.RemoveEntry(subChanID2)
485	tcs, _ = channelz.GetTopChannels(0, 0)
486	if tcs == nil || len(tcs) != 1 {
487		t.Fatalf("There should be one TopChannel entry")
488	}
489	if len(tcs[0].SubChans) != 1 {
490		t.Fatalf("There should be one SubChannel entry")
491	}
492
493	channelz.RemoveEntry(sktID1)
494	channelz.RemoveEntry(sktID2)
495	tcs, _ = channelz.GetTopChannels(0, 0)
496	if tcs != nil {
497		t.Fatalf("There should be no TopChannel entry")
498	}
499}
500
501func (s) TestCZChannelMetrics(t *testing.T) {
502	czCleanup := channelz.NewChannelzStorage()
503	defer czCleanupWrapper(czCleanup, t)
504	e := tcpClearRREnv
505	num := 3 // number of backends
506	te := newTest(t, e)
507	te.maxClientSendMsgSize = newInt(8)
508	var svrAddrs []resolver.Address
509	te.startServers(&testServer{security: e.security}, num)
510	r, cleanup := manual.GenerateAndRegisterManualResolver()
511	defer cleanup()
512	for _, a := range te.srvAddrs {
513		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
514	}
515	r.InitialState(resolver.State{Addresses: svrAddrs})
516	te.resolverScheme = r.Scheme()
517	cc := te.clientConn()
518	defer te.tearDown()
519	tc := testpb.NewTestServiceClient(cc)
520	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
521		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
522	}
523
524	const smallSize = 1
525	const largeSize = 8
526
527	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
528	if err != nil {
529		t.Fatal(err)
530	}
531	req := &testpb.SimpleRequest{
532		ResponseType: testpb.PayloadType_COMPRESSABLE,
533		ResponseSize: int32(smallSize),
534		Payload:      largePayload,
535	}
536
537	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
538		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
539	}
540
541	stream, err := tc.FullDuplexCall(context.Background())
542	if err != nil {
543		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
544	}
545	defer stream.CloseSend()
546	// Here, we just wait for all sockets to be up. In the future, if we implement
547	// IDLE, we may need to make several rpc calls to create the sockets.
548	if err := verifyResultWithDelay(func() (bool, error) {
549		tcs, _ := channelz.GetTopChannels(0, 0)
550		if len(tcs) != 1 {
551			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
552		}
553		if len(tcs[0].SubChans) != num {
554			return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
555		}
556		var cst, csu, cf int64
557		for k := range tcs[0].SubChans {
558			sc := channelz.GetSubChannel(k)
559			if sc == nil {
560				return false, fmt.Errorf("got <nil> subchannel")
561			}
562			cst += sc.ChannelData.CallsStarted
563			csu += sc.ChannelData.CallsSucceeded
564			cf += sc.ChannelData.CallsFailed
565		}
566		if cst != 3 {
567			return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
568		}
569		if csu != 1 {
570			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
571		}
572		if cf != 1 {
573			return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
574		}
575		if tcs[0].ChannelData.CallsStarted != 3 {
576			return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted)
577		}
578		if tcs[0].ChannelData.CallsSucceeded != 1 {
579			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded)
580		}
581		if tcs[0].ChannelData.CallsFailed != 1 {
582			return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed)
583		}
584		return true, nil
585	}); err != nil {
586		t.Fatal(err)
587	}
588}
589
590func (s) TestCZServerMetrics(t *testing.T) {
591	czCleanup := channelz.NewChannelzStorage()
592	defer czCleanupWrapper(czCleanup, t)
593	e := tcpClearRREnv
594	te := newTest(t, e)
595	te.maxServerReceiveMsgSize = newInt(8)
596	te.startServer(&testServer{security: e.security})
597	defer te.tearDown()
598	cc := te.clientConn()
599	tc := testpb.NewTestServiceClient(cc)
600	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
601		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
602	}
603
604	const smallSize = 1
605	const largeSize = 8
606
607	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
608	if err != nil {
609		t.Fatal(err)
610	}
611	req := &testpb.SimpleRequest{
612		ResponseType: testpb.PayloadType_COMPRESSABLE,
613		ResponseSize: int32(smallSize),
614		Payload:      largePayload,
615	}
616	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
617		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
618	}
619
620	stream, err := tc.FullDuplexCall(context.Background())
621	if err != nil {
622		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
623	}
624	defer stream.CloseSend()
625
626	if err := verifyResultWithDelay(func() (bool, error) {
627		ss, _ := channelz.GetServers(0, 0)
628		if len(ss) != 1 {
629			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
630		}
631		if ss[0].ServerData.CallsStarted != 3 {
632			return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted)
633		}
634		if ss[0].ServerData.CallsSucceeded != 1 {
635			return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded)
636		}
637		if ss[0].ServerData.CallsFailed != 1 {
638			return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed)
639		}
640		return true, nil
641	}); err != nil {
642		t.Fatal(err)
643	}
644}
645
646type testServiceClientWrapper struct {
647	testpb.TestServiceClient
648	mu             sync.RWMutex
649	streamsCreated int
650}
651
652func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
653	t.mu.RLock()
654	defer t.mu.RUnlock()
655	return uint32(2*t.streamsCreated - 1)
656}
657
658func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
659	t.mu.Lock()
660	defer t.mu.Unlock()
661	t.streamsCreated++
662	return t.TestServiceClient.EmptyCall(ctx, in, opts...)
663}
664
665func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
666	t.mu.Lock()
667	defer t.mu.Unlock()
668	t.streamsCreated++
669	return t.TestServiceClient.UnaryCall(ctx, in, opts...)
670}
671
672func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) {
673	t.mu.Lock()
674	defer t.mu.Unlock()
675	t.streamsCreated++
676	return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
677}
678
679func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) {
680	t.mu.Lock()
681	defer t.mu.Unlock()
682	t.streamsCreated++
683	return t.TestServiceClient.StreamingInputCall(ctx, opts...)
684}
685
686func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) {
687	t.mu.Lock()
688	defer t.mu.Unlock()
689	t.streamsCreated++
690	return t.TestServiceClient.FullDuplexCall(ctx, opts...)
691}
692
693func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) {
694	t.mu.Lock()
695	defer t.mu.Unlock()
696	t.streamsCreated++
697	return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
698}
699
700func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
701	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
702		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
703	}
704}
705
706func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) {
707	s, err := tc.StreamingInputCall(context.Background())
708	if err != nil {
709		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
710	}
711	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
712	if err != nil {
713		t.Fatal(err)
714	}
715	s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
716}
717
718func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
719	const smallSize = 1
720	const largeSize = 2000
721
722	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
723	if err != nil {
724		t.Fatal(err)
725	}
726	req := &testpb.SimpleRequest{
727		ResponseType: testpb.PayloadType_COMPRESSABLE,
728		ResponseSize: int32(smallSize),
729		Payload:      largePayload,
730	}
731	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
732		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
733	}
734}
735
736func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) {
737	ctx, cancel := context.WithCancel(context.Background())
738	stream, err := tc.FullDuplexCall(ctx)
739	if err != nil {
740		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
741	}
742
743	const smallSize = 1
744	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
745	if err != nil {
746		t.Fatal(err)
747	}
748
749	sreq := &testpb.StreamingOutputCallRequest{
750		ResponseType: testpb.PayloadType_COMPRESSABLE,
751		ResponseParameters: []*testpb.ResponseParameters{
752			{Size: smallSize},
753		},
754		Payload: smallPayload,
755	}
756
757	if err := stream.Send(sreq); err != nil {
758		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
759	}
760	if _, err := stream.Recv(); err != nil {
761		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
762	}
763	// By canceling the call, the client will send rst_stream to end the call, and
764	// the stream will failed as a result.
765	cancel()
766}
767
768// This func is to be used to test client side counting of failed streams.
769func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
770	stream, err := tc.FullDuplexCall(context.Background())
771	if err != nil {
772		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
773	}
774
775	const smallSize = 1
776	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
777	if err != nil {
778		t.Fatal(err)
779	}
780
781	sreq := &testpb.StreamingOutputCallRequest{
782		ResponseType: testpb.PayloadType_COMPRESSABLE,
783		ResponseParameters: []*testpb.ResponseParameters{
784			{Size: smallSize},
785		},
786		Payload: smallPayload,
787	}
788
789	if err := stream.Send(sreq); err != nil {
790		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
791	}
792	if _, err := stream.Recv(); err != nil {
793		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
794	}
795
796	rcw := l.getLastConn()
797
798	if rcw != nil {
799		rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
800	}
801	if _, err := stream.Recv(); err == nil {
802		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
803	}
804}
805
806// this func is to be used to test client side counting of failed streams.
807func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
808	// This call is just to keep the transport from shutting down (socket will be deleted
809	// in this case, and we will not be able to get metrics).
810	s, err := tc.FullDuplexCall(context.Background())
811	if err != nil {
812		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
813	}
814	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
815		{
816			Size: 1,
817		},
818	}}); err != nil {
819		t.Fatalf("s.Send() failed with error: %v", err)
820	}
821	if _, err := s.Recv(); err != nil {
822		t.Fatalf("s.Recv() failed with error: %v", err)
823	}
824
825	s, err = tc.FullDuplexCall(context.Background())
826	if err != nil {
827		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
828	}
829	if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
830		{
831			Size: 1,
832		},
833	}}); err != nil {
834		t.Fatalf("s.Send() failed with error: %v", err)
835	}
836	if _, err := s.Recv(); err != nil {
837		t.Fatalf("s.Recv() failed with error: %v", err)
838	}
839
840	rcw := l.getLastConn()
841	if rcw != nil {
842		rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
843	}
844	if _, err := s.Recv(); err == nil {
845		t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
846	}
847}
848
849func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
850	ctx, cancel := context.WithCancel(context.Background())
851	_, err := tc.FullDuplexCall(ctx)
852	if err != nil {
853		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
854	}
855	// Allow for at least 2 keepalives (1s per ping interval)
856	time.Sleep(4 * time.Second)
857	cancel()
858}
859
860func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
861	czCleanup := channelz.NewChannelzStorage()
862	defer czCleanupWrapper(czCleanup, t)
863	e := tcpClearRREnv
864	te := newTest(t, e)
865	te.maxServerReceiveMsgSize = newInt(20)
866	te.maxClientReceiveMsgSize = newInt(20)
867	rcw := te.startServerWithConnControl(&testServer{security: e.security})
868	defer te.tearDown()
869	cc := te.clientConn()
870	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
871
872	doSuccessfulUnaryCall(tc, t)
873	var scID, skID int64
874	if err := verifyResultWithDelay(func() (bool, error) {
875		tchan, _ := channelz.GetTopChannels(0, 0)
876		if len(tchan) != 1 {
877			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
878		}
879		if len(tchan[0].SubChans) != 1 {
880			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
881		}
882
883		for scID = range tchan[0].SubChans {
884			break
885		}
886		sc := channelz.GetSubChannel(scID)
887		if sc == nil {
888			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
889		}
890		if len(sc.Sockets) != 1 {
891			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
892		}
893		for skID = range sc.Sockets {
894			break
895		}
896		skt := channelz.GetSocket(skID)
897		sktData := skt.SocketData
898		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
899			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
900		}
901		return true, nil
902	}); err != nil {
903		t.Fatal(err)
904	}
905
906	doServerSideFailedUnaryCall(tc, t)
907	if err := verifyResultWithDelay(func() (bool, error) {
908		skt := channelz.GetSocket(skID)
909		sktData := skt.SocketData
910		if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 {
911			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
912		}
913		return true, nil
914	}); err != nil {
915		t.Fatal(err)
916	}
917
918	doClientSideInitiatedFailedStream(tc, t)
919	if err := verifyResultWithDelay(func() (bool, error) {
920		skt := channelz.GetSocket(skID)
921		sktData := skt.SocketData
922		if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 {
923			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
924		}
925		return true, nil
926	}); err != nil {
927		t.Fatal(err)
928	}
929
930	doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
931	if err := verifyResultWithDelay(func() (bool, error) {
932		skt := channelz.GetSocket(skID)
933		sktData := skt.SocketData
934		if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 {
935			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
936		}
937		return true, nil
938	}); err != nil {
939		t.Fatal(err)
940	}
941
942	doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw)
943	if err := verifyResultWithDelay(func() (bool, error) {
944		skt := channelz.GetSocket(skID)
945		sktData := skt.SocketData
946		if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 {
947			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
948		}
949		return true, nil
950	}); err != nil {
951		t.Fatal(err)
952	}
953}
954
955// This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
956// TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
957// server sending RST_STREAM to client due to client side flow control violation.
958// It is separated from other cases due to setup incompatibly, i.e. max receive
959// size violation will mask flow control violation.
960func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
961	czCleanup := channelz.NewChannelzStorage()
962	defer czCleanupWrapper(czCleanup, t)
963	e := tcpClearRREnv
964	te := newTest(t, e)
965	te.serverInitialWindowSize = 65536
966	// Avoid overflowing connection level flow control window, which will lead to
967	// transport being closed.
968	te.serverInitialConnWindowSize = 65536 * 2
969	ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
970		stream.Send(&testpb.StreamingOutputCallResponse{})
971		<-stream.Context().Done()
972		return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
973	}}
974	te.startServer(ts)
975	defer te.tearDown()
976	cc, dw := te.clientConnWithConnControl()
977	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
978
979	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
980	stream, err := tc.FullDuplexCall(ctx)
981	if err != nil {
982		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
983	}
984	if _, err := stream.Recv(); err != nil {
985		t.Fatalf("stream.Recv() = %v, want nil", err)
986	}
987	go func() {
988		payload := make([]byte, 16384)
989		for i := 0; i < 6; i++ {
990			dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload)
991		}
992	}()
993	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
994		t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted)
995	}
996	cancel()
997
998	if err := verifyResultWithDelay(func() (bool, error) {
999		tchan, _ := channelz.GetTopChannels(0, 0)
1000		if len(tchan) != 1 {
1001			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1002		}
1003		if len(tchan[0].SubChans) != 1 {
1004			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1005		}
1006		var id int64
1007		for id = range tchan[0].SubChans {
1008			break
1009		}
1010		sc := channelz.GetSubChannel(id)
1011		if sc == nil {
1012			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1013		}
1014		if len(sc.Sockets) != 1 {
1015			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1016		}
1017		for id = range sc.Sockets {
1018			break
1019		}
1020		skt := channelz.GetSocket(id)
1021		sktData := skt.SocketData
1022		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
1023			return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
1024		}
1025		ss, _ := channelz.GetServers(0, 0)
1026		if len(ss) != 1 {
1027			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1028		}
1029
1030		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1031		if len(ns) != 1 {
1032			return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
1033		}
1034		sktData = ns[0].SocketData
1035		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
1036			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
1037		}
1038		return true, nil
1039	}); err != nil {
1040		t.Fatal(err)
1041	}
1042}
1043
1044func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
1045	czCleanup := channelz.NewChannelzStorage()
1046	defer czCleanupWrapper(czCleanup, t)
1047	e := tcpClearRREnv
1048	te := newTest(t, e)
1049	// disable BDP
1050	te.serverInitialWindowSize = 65536
1051	te.serverInitialConnWindowSize = 65536
1052	te.clientInitialWindowSize = 65536
1053	te.clientInitialConnWindowSize = 65536
1054	te.startServer(&testServer{security: e.security})
1055	defer te.tearDown()
1056	cc := te.clientConn()
1057	tc := testpb.NewTestServiceClient(cc)
1058
1059	for i := 0; i < 10; i++ {
1060		doSuccessfulUnaryCall(tc, t)
1061	}
1062
1063	var cliSktID, svrSktID int64
1064	if err := verifyResultWithDelay(func() (bool, error) {
1065		tchan, _ := channelz.GetTopChannels(0, 0)
1066		if len(tchan) != 1 {
1067			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1068		}
1069		if len(tchan[0].SubChans) != 1 {
1070			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1071		}
1072		var id int64
1073		for id = range tchan[0].SubChans {
1074			break
1075		}
1076		sc := channelz.GetSubChannel(id)
1077		if sc == nil {
1078			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1079		}
1080		if len(sc.Sockets) != 1 {
1081			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1082		}
1083		for id = range sc.Sockets {
1084			break
1085		}
1086		skt := channelz.GetSocket(id)
1087		sktData := skt.SocketData
1088		// 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1089		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1090			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1091		}
1092		ss, _ := channelz.GetServers(0, 0)
1093		if len(ss) != 1 {
1094			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1095		}
1096		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1097		sktData = ns[0].SocketData
1098		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1099			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1100		}
1101		cliSktID, svrSktID = id, ss[0].ID
1102		return true, nil
1103	}); err != nil {
1104		t.Fatal(err)
1105	}
1106
1107	doStreamingInputCallWithLargePayload(tc, t)
1108
1109	if err := verifyResultWithDelay(func() (bool, error) {
1110		skt := channelz.GetSocket(cliSktID)
1111		sktData := skt.SocketData
1112		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1113		// Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
1114		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
1115			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1116		}
1117		ss, _ := channelz.GetServers(0, 0)
1118		if len(ss) != 1 {
1119			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1120		}
1121		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1122		sktData = ns[0].SocketData
1123		if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
1124			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1125		}
1126		return true, nil
1127	}); err != nil {
1128		t.Fatal(err)
1129	}
1130
1131	// triggers transport flow control window update on server side, since unacked
1132	// bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
1133	doStreamingInputCallWithLargePayload(tc, t)
1134	if err := verifyResultWithDelay(func() (bool, error) {
1135		skt := channelz.GetSocket(cliSktID)
1136		sktData := skt.SocketData
1137		// Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
1138		// Remote: 65536
1139		if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
1140			return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1141		}
1142		ss, _ := channelz.GetServers(0, 0)
1143		if len(ss) != 1 {
1144			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1145		}
1146		ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1147		sktData = ns[0].SocketData
1148		if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
1149			return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1150		}
1151		return true, nil
1152	}); err != nil {
1153		t.Fatal(err)
1154	}
1155}
1156
1157func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
1158	czCleanup := channelz.NewChannelzStorage()
1159	defer czCleanupWrapper(czCleanup, t)
1160	defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
1161	internal.KeepaliveMinPingTime = time.Second
1162	e := tcpClearRREnv
1163	te := newTest(t, e)
1164	te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
1165		keepalive.ClientParameters{
1166			Time:                time.Second,
1167			Timeout:             500 * time.Millisecond,
1168			PermitWithoutStream: true,
1169		}))
1170	te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
1171		keepalive.EnforcementPolicy{
1172			MinTime:             500 * time.Millisecond,
1173			PermitWithoutStream: true,
1174		}))
1175	te.startServer(&testServer{security: e.security})
1176	te.clientConn() // Dial the server
1177	defer te.tearDown()
1178	if err := verifyResultWithDelay(func() (bool, error) {
1179		tchan, _ := channelz.GetTopChannels(0, 0)
1180		if len(tchan) != 1 {
1181			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1182		}
1183		if len(tchan[0].SubChans) != 1 {
1184			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1185		}
1186		var id int64
1187		for id = range tchan[0].SubChans {
1188			break
1189		}
1190		sc := channelz.GetSubChannel(id)
1191		if sc == nil {
1192			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1193		}
1194		if len(sc.Sockets) != 1 {
1195			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1196		}
1197		for id = range sc.Sockets {
1198			break
1199		}
1200		skt := channelz.GetSocket(id)
1201		if skt.SocketData.KeepAlivesSent != 2 {
1202			return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
1203		}
1204		return true, nil
1205	}); err != nil {
1206		t.Fatal(err)
1207	}
1208}
1209
1210func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
1211	czCleanup := channelz.NewChannelzStorage()
1212	defer czCleanupWrapper(czCleanup, t)
1213	e := tcpClearRREnv
1214	te := newTest(t, e)
1215	te.maxServerReceiveMsgSize = newInt(20)
1216	te.maxClientReceiveMsgSize = newInt(20)
1217	te.startServer(&testServer{security: e.security})
1218	defer te.tearDown()
1219	cc, _ := te.clientConnWithConnControl()
1220	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
1221
1222	var svrID int64
1223	if err := verifyResultWithDelay(func() (bool, error) {
1224		ss, _ := channelz.GetServers(0, 0)
1225		if len(ss) != 1 {
1226			return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1227		}
1228		svrID = ss[0].ID
1229		return true, nil
1230	}); err != nil {
1231		t.Fatal(err)
1232	}
1233
1234	doSuccessfulUnaryCall(tc, t)
1235	if err := verifyResultWithDelay(func() (bool, error) {
1236		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1237		sktData := ns[0].SocketData
1238		if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
1239			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1240		}
1241		return true, nil
1242	}); err != nil {
1243		t.Fatal(err)
1244	}
1245
1246	doServerSideFailedUnaryCall(tc, t)
1247	if err := verifyResultWithDelay(func() (bool, error) {
1248		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1249		sktData := ns[0].SocketData
1250		if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
1251			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1252		}
1253		return true, nil
1254	}); err != nil {
1255		t.Fatal(err)
1256	}
1257
1258	doClientSideInitiatedFailedStream(tc, t)
1259	if err := verifyResultWithDelay(func() (bool, error) {
1260		ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1261		sktData := ns[0].SocketData
1262		if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
1263			return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
1264		}
1265		return true, nil
1266	}); err != nil {
1267		t.Fatal(err)
1268	}
1269}
1270
1271func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
1272	czCleanup := channelz.NewChannelzStorage()
1273	defer czCleanupWrapper(czCleanup, t)
1274	e := tcpClearRREnv
1275	te := newTest(t, e)
1276	// We setup the server keepalive parameters to send one keepalive every
1277	// second, and verify that the actual number of keepalives is very close to
1278	// the number of seconds elapsed in the test.  We had a bug wherein the
1279	// server was sending one keepalive every [Time+Timeout] instead of every
1280	// [Time] period, and since Timeout is configured to a low value here, we
1281	// should be able to verify that the fix works with the above mentioned
1282	// logic.
1283	kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{
1284		Time:    time.Second,
1285		Timeout: 100 * time.Millisecond,
1286	})
1287	te.customServerOptions = append(te.customServerOptions, kpOption)
1288	te.startServer(&testServer{security: e.security})
1289	defer te.tearDown()
1290	cc := te.clientConn()
1291	tc := testpb.NewTestServiceClient(cc)
1292	start := time.Now()
1293	doIdleCallToInvokeKeepAlive(tc, t)
1294
1295	if err := verifyResultWithDelay(func() (bool, error) {
1296		ss, _ := channelz.GetServers(0, 0)
1297		if len(ss) != 1 {
1298			return false, fmt.Errorf("there should be one server, not %d", len(ss))
1299		}
1300		ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1301		if len(ns) != 1 {
1302			return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
1303		}
1304		wantKeepalivesCount := int64(time.Since(start).Seconds()) - 1
1305		if gotKeepalivesCount := ns[0].SocketData.KeepAlivesSent; gotKeepalivesCount != wantKeepalivesCount {
1306			return false, fmt.Errorf("got keepalivesCount: %v, want keepalivesCount: %v", gotKeepalivesCount, wantKeepalivesCount)
1307		}
1308		return true, nil
1309	}); err != nil {
1310		t.Fatal(err)
1311	}
1312}
1313
1314var cipherSuites = []string{
1315	"TLS_RSA_WITH_RC4_128_SHA",
1316	"TLS_RSA_WITH_3DES_EDE_CBC_SHA",
1317	"TLS_RSA_WITH_AES_128_CBC_SHA",
1318	"TLS_RSA_WITH_AES_256_CBC_SHA",
1319	"TLS_RSA_WITH_AES_128_GCM_SHA256",
1320	"TLS_RSA_WITH_AES_256_GCM_SHA384",
1321	"TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
1322	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
1323	"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
1324	"TLS_ECDHE_RSA_WITH_RC4_128_SHA",
1325	"TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
1326	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
1327	"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
1328	"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
1329	"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
1330	"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
1331	"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
1332	"TLS_FALLBACK_SCSV",
1333	"TLS_RSA_WITH_AES_128_CBC_SHA256",
1334	"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
1335	"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
1336	"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
1337	"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
1338	"TLS_AES_128_GCM_SHA256",
1339	"TLS_AES_256_GCM_SHA384",
1340	"TLS_CHACHA20_POLY1305_SHA256",
1341}
1342
1343func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
1344	czCleanup := channelz.NewChannelzStorage()
1345	defer czCleanupWrapper(czCleanup, t)
1346	e := tcpTLSRREnv
1347	te := newTest(t, e)
1348	te.startServer(&testServer{security: e.security})
1349	defer te.tearDown()
1350	te.clientConn()
1351	if err := verifyResultWithDelay(func() (bool, error) {
1352		tchan, _ := channelz.GetTopChannels(0, 0)
1353		if len(tchan) != 1 {
1354			return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1355		}
1356		if len(tchan[0].SubChans) != 1 {
1357			return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
1358		}
1359		var id int64
1360		for id = range tchan[0].SubChans {
1361			break
1362		}
1363		sc := channelz.GetSubChannel(id)
1364		if sc == nil {
1365			return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1366		}
1367		if len(sc.Sockets) != 1 {
1368			return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
1369		}
1370		for id = range sc.Sockets {
1371			break
1372		}
1373		skt := channelz.GetSocket(id)
1374		cert, _ := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
1375		securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue)
1376		if !ok {
1377			return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
1378		}
1379		if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
1380			return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
1381		}
1382		for _, v := range cipherSuites {
1383			if v == securityVal.StandardName {
1384				return true, nil
1385			}
1386		}
1387		return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites)
1388	}); err != nil {
1389		t.Fatal(err)
1390	}
1391}
1392
1393func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
1394	czCleanup := channelz.NewChannelzStorage()
1395	defer czCleanupWrapper(czCleanup, t)
1396	e := tcpClearRREnv
1397	// avoid calling API to set balancer type, which will void service config's change of balancer.
1398	e.balancer = ""
1399	te := newTest(t, e)
1400	r, cleanup := manual.GenerateAndRegisterManualResolver()
1401	defer cleanup()
1402	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
1403	r.InitialState(resolver.State{Addresses: resolvedAddrs})
1404	te.resolverScheme = r.Scheme()
1405	te.clientConn()
1406	defer te.tearDown()
1407	var nestedConn int64
1408	if err := verifyResultWithDelay(func() (bool, error) {
1409		tcs, _ := channelz.GetTopChannels(0, 0)
1410		if len(tcs) != 1 {
1411			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1412		}
1413		if len(tcs[0].NestedChans) != 1 {
1414			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1415		}
1416		for k := range tcs[0].NestedChans {
1417			nestedConn = k
1418		}
1419		for _, e := range tcs[0].Trace.Events {
1420			if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
1421				return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
1422			}
1423		}
1424		ncm := channelz.GetChannel(nestedConn)
1425		if ncm.Trace == nil {
1426			return false, fmt.Errorf("trace for nested channel should not be empty")
1427		}
1428		if len(ncm.Trace.Events) == 0 {
1429			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1430		}
1431		if ncm.Trace.Events[0].Desc != "Channel Created" {
1432			return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
1433		}
1434		return true, nil
1435	}); err != nil {
1436		t.Fatal(err)
1437	}
1438
1439	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1440
1441	// wait for the shutdown of grpclb balancer
1442	if err := verifyResultWithDelay(func() (bool, error) {
1443		tcs, _ := channelz.GetTopChannels(0, 0)
1444		if len(tcs) != 1 {
1445			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1446		}
1447		if len(tcs[0].NestedChans) != 0 {
1448			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1449		}
1450		ncm := channelz.GetChannel(nestedConn)
1451		if ncm == nil {
1452			return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
1453		}
1454		if ncm.Trace == nil {
1455			return false, fmt.Errorf("trace for nested channel should not be empty")
1456		}
1457		if len(ncm.Trace.Events) == 0 {
1458			return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1459		}
1460		if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
1461			return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
1462		}
1463		return true, nil
1464	}); err != nil {
1465		t.Fatal(err)
1466	}
1467}
1468
1469func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
1470	czCleanup := channelz.NewChannelzStorage()
1471	defer czCleanupWrapper(czCleanup, t)
1472	e := tcpClearRREnv
1473	te := newTest(t, e)
1474	te.startServer(&testServer{security: e.security})
1475	r, cleanup := manual.GenerateAndRegisterManualResolver()
1476	defer cleanup()
1477	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1478	te.resolverScheme = r.Scheme()
1479	te.clientConn()
1480	defer te.tearDown()
1481	var subConn int64
1482	// Here, we just wait for all sockets to be up. In the future, if we implement
1483	// IDLE, we may need to make several rpc calls to create the sockets.
1484	if err := verifyResultWithDelay(func() (bool, error) {
1485		tcs, _ := channelz.GetTopChannels(0, 0)
1486		if len(tcs) != 1 {
1487			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1488		}
1489		if len(tcs[0].SubChans) != 1 {
1490			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1491		}
1492		for k := range tcs[0].SubChans {
1493			subConn = k
1494		}
1495		for _, e := range tcs[0].Trace.Events {
1496			if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
1497				return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
1498			}
1499		}
1500		scm := channelz.GetSubChannel(subConn)
1501		if scm == nil {
1502			return false, fmt.Errorf("subChannel does not exist")
1503		}
1504		if scm.Trace == nil {
1505			return false, fmt.Errorf("trace for subChannel should not be empty")
1506		}
1507		if len(scm.Trace.Events) == 0 {
1508			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1509		}
1510		if scm.Trace.Events[0].Desc != "Subchannel Created" {
1511			return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
1512		}
1513		return true, nil
1514	}); err != nil {
1515		t.Fatal(err)
1516	}
1517
1518	// Wait for ready
1519	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1520	defer cancel()
1521	for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
1522		if !te.cc.WaitForStateChange(ctx, src) {
1523			t.Fatalf("timed out waiting for state change.  got %v; want %v", src, connectivity.Ready)
1524		}
1525	}
1526	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1527	// Wait for not-ready.
1528	for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
1529		if !te.cc.WaitForStateChange(ctx, src) {
1530			t.Fatalf("timed out waiting for state change.  got %v; want !%v", src, connectivity.Ready)
1531		}
1532	}
1533
1534	if err := verifyResultWithDelay(func() (bool, error) {
1535		tcs, _ := channelz.GetTopChannels(0, 0)
1536		if len(tcs) != 1 {
1537			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1538		}
1539		if len(tcs[0].SubChans) != 1 {
1540			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1541		}
1542		scm := channelz.GetSubChannel(subConn)
1543		if scm == nil {
1544			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1545		}
1546		if scm.Trace == nil {
1547			return false, fmt.Errorf("trace for SubChannel should not be empty")
1548		}
1549		if len(scm.Trace.Events) == 0 {
1550			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1551		}
1552		if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
1553			return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
1554		}
1555
1556		return true, nil
1557	}); err != nil {
1558		t.Fatal(err)
1559	}
1560}
1561
1562func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
1563	czCleanup := channelz.NewChannelzStorage()
1564	defer czCleanupWrapper(czCleanup, t)
1565	e := tcpClearRREnv
1566	e.balancer = ""
1567	te := newTest(t, e)
1568	te.startServer(&testServer{security: e.security})
1569	r, cleanup := manual.GenerateAndRegisterManualResolver()
1570	defer cleanup()
1571	addrs := []resolver.Address{{Addr: te.srvAddr}}
1572	r.InitialState(resolver.State{Addresses: addrs})
1573	te.resolverScheme = r.Scheme()
1574	te.clientConn()
1575	defer te.tearDown()
1576	var cid int64
1577	// Here, we just wait for all sockets to be up. In the future, if we implement
1578	// IDLE, we may need to make several rpc calls to create the sockets.
1579	if err := verifyResultWithDelay(func() (bool, error) {
1580		tcs, _ := channelz.GetTopChannels(0, 0)
1581		if len(tcs) != 1 {
1582			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1583		}
1584		cid = tcs[0].ID
1585		for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
1586			if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") {
1587				break
1588			}
1589			if i == 0 {
1590				return false, fmt.Errorf("events do not contain expected address resolution from empty address state.  Got: %+v", tcs[0].Trace.Events)
1591			}
1592		}
1593		return true, nil
1594	}); err != nil {
1595		t.Fatal(err)
1596	}
1597	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1598
1599	if err := verifyResultWithDelay(func() (bool, error) {
1600		cm := channelz.GetChannel(cid)
1601		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1602			if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
1603				break
1604			}
1605			if i == 0 {
1606				return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
1607			}
1608		}
1609		return true, nil
1610	}); err != nil {
1611		t.Fatal(err)
1612	}
1613
1614	newSC := parseCfg(r, `{
1615    "methodConfig": [
1616        {
1617            "name": [
1618                {
1619                    "service": "grpc.testing.TestService",
1620                    "method": "EmptyCall"
1621                }
1622            ],
1623            "waitForReady": false,
1624            "timeout": ".001s"
1625        }
1626    ]
1627}`)
1628	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
1629
1630	if err := verifyResultWithDelay(func() (bool, error) {
1631		cm := channelz.GetChannel(cid)
1632
1633		var es []string
1634		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1635			if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") {
1636				break
1637			}
1638			es = append(es, cm.Trace.Events[i].Desc)
1639			if i == 0 {
1640				return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n"))
1641			}
1642		}
1643		return true, nil
1644	}); err != nil {
1645		t.Fatal(err)
1646	}
1647
1648	r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
1649
1650	if err := verifyResultWithDelay(func() (bool, error) {
1651		cm := channelz.GetChannel(cid)
1652		for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
1653			if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") {
1654				break
1655			}
1656			if i == 0 {
1657				return false, fmt.Errorf("events do not contain expected address resolution of empty address")
1658			}
1659		}
1660		return true, nil
1661	}); err != nil {
1662		t.Fatal(err)
1663	}
1664}
1665
1666func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
1667	czCleanup := channelz.NewChannelzStorage()
1668	defer czCleanupWrapper(czCleanup, t)
1669	e := tcpClearRREnv
1670	e.balancer = ""
1671	te := newTest(t, e)
1672	te.startServers(&testServer{security: e.security}, 3)
1673	r, cleanup := manual.GenerateAndRegisterManualResolver()
1674	defer cleanup()
1675	var svrAddrs []resolver.Address
1676	for _, a := range te.srvAddrs {
1677		svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
1678	}
1679	r.InitialState(resolver.State{Addresses: svrAddrs})
1680	te.resolverScheme = r.Scheme()
1681	cc := te.clientConn()
1682	defer te.tearDown()
1683	tc := testpb.NewTestServiceClient(cc)
1684	// make sure the connection is up
1685	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1686	defer cancel()
1687	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1688		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1689	}
1690	te.srvs[0].Stop()
1691	te.srvs[1].Stop()
1692	// Here, we just wait for all sockets to be up. In the future, if we implement
1693	// IDLE, we may need to make several rpc calls to create the sockets.
1694	if err := verifyResultWithDelay(func() (bool, error) {
1695		tcs, _ := channelz.GetTopChannels(0, 0)
1696		if len(tcs) != 1 {
1697			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1698		}
1699		if len(tcs[0].SubChans) != 1 {
1700			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1701		}
1702		var subConn int64
1703		for k := range tcs[0].SubChans {
1704			subConn = k
1705		}
1706		scm := channelz.GetSubChannel(subConn)
1707		if scm.Trace == nil {
1708			return false, fmt.Errorf("trace for SubChannel should not be empty")
1709		}
1710		if len(scm.Trace.Events) == 0 {
1711			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1712		}
1713		for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
1714			if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
1715				break
1716			}
1717			if i == 0 {
1718				return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
1719			}
1720		}
1721		return true, nil
1722	}); err != nil {
1723		t.Fatal(err)
1724	}
1725}
1726
1727func (s) TestCZSubChannelConnectivityState(t *testing.T) {
1728	czCleanup := channelz.NewChannelzStorage()
1729	defer czCleanupWrapper(czCleanup, t)
1730	e := tcpClearRREnv
1731	te := newTest(t, e)
1732	te.startServer(&testServer{security: e.security})
1733	r, cleanup := manual.GenerateAndRegisterManualResolver()
1734	defer cleanup()
1735	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1736	te.resolverScheme = r.Scheme()
1737	cc := te.clientConn()
1738	defer te.tearDown()
1739	tc := testpb.NewTestServiceClient(cc)
1740	// make sure the connection is up
1741	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1742	defer cancel()
1743	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1744		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1745	}
1746	var subConn int64
1747	te.srv.Stop()
1748
1749	if err := verifyResultWithDelay(func() (bool, error) {
1750		// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
1751		// to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
1752		if subConn == 0 {
1753			tcs, _ := channelz.GetTopChannels(0, 0)
1754			if len(tcs) != 1 {
1755				return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1756			}
1757			if len(tcs[0].SubChans) != 1 {
1758				return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1759			}
1760			for k := range tcs[0].SubChans {
1761				// get the SubChannel id for further trace inquiry.
1762				subConn = k
1763			}
1764		}
1765		scm := channelz.GetSubChannel(subConn)
1766		if scm == nil {
1767			return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1768		}
1769		if scm.Trace == nil {
1770			return false, fmt.Errorf("trace for SubChannel should not be empty")
1771		}
1772		if len(scm.Trace.Events) == 0 {
1773			return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1774		}
1775		var ready, connecting, transient, shutdown int
1776		for _, e := range scm.Trace.Events {
1777			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
1778				transient++
1779			}
1780		}
1781		// Make sure the SubChannel has already seen transient failure before shutting it down through
1782		// r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}).
1783		if transient == 0 {
1784			return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
1785		}
1786		transient = 0
1787		r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1788		for _, e := range scm.Trace.Events {
1789			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
1790				ready++
1791			}
1792			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
1793				connecting++
1794			}
1795			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
1796				transient++
1797			}
1798			if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
1799				shutdown++
1800			}
1801		}
1802		// example:
1803		// Subchannel Created
1804		// Subchannel's connectivity state changed to CONNECTING
1805		// Subchannel picked a new address: "localhost:36011"
1806		// Subchannel's connectivity state changed to READY
1807		// Subchannel's connectivity state changed to TRANSIENT_FAILURE
1808		// Subchannel's connectivity state changed to CONNECTING
1809		// Subchannel picked a new address: "localhost:36011"
1810		// Subchannel's connectivity state changed to SHUTDOWN
1811		// Subchannel Deleted
1812		if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
1813			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
1814		}
1815
1816		return true, nil
1817	}); err != nil {
1818		t.Fatal(err)
1819	}
1820}
1821
1822func (s) TestCZChannelConnectivityState(t *testing.T) {
1823	czCleanup := channelz.NewChannelzStorage()
1824	defer czCleanupWrapper(czCleanup, t)
1825	e := tcpClearRREnv
1826	te := newTest(t, e)
1827	te.startServer(&testServer{security: e.security})
1828	r, cleanup := manual.GenerateAndRegisterManualResolver()
1829	defer cleanup()
1830	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1831	te.resolverScheme = r.Scheme()
1832	cc := te.clientConn()
1833	defer te.tearDown()
1834	tc := testpb.NewTestServiceClient(cc)
1835	// make sure the connection is up
1836	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1837	defer cancel()
1838	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1839		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1840	}
1841	te.srv.Stop()
1842	if err := verifyResultWithDelay(func() (bool, error) {
1843		tcs, _ := channelz.GetTopChannels(0, 0)
1844		if len(tcs) != 1 {
1845			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1846		}
1847
1848		var ready, connecting, transient int
1849		for _, e := range tcs[0].Trace.Events {
1850			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
1851				ready++
1852			}
1853			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
1854				connecting++
1855			}
1856			if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
1857				transient++
1858			}
1859		}
1860
1861		// example:
1862		// Channel Created
1863		// Adressses resolved (from empty address state): "localhost:40467"
1864		// SubChannel (id: 4[]) Created
1865		// Channel's connectivity state changed to CONNECTING
1866		// Channel's connectivity state changed to READY
1867		// Channel's connectivity state changed to TRANSIENT_FAILURE
1868		// Channel's connectivity state changed to CONNECTING
1869		// Channel's connectivity state changed to TRANSIENT_FAILURE
1870		if ready != 1 || connecting < 1 || transient < 1 {
1871			return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
1872		}
1873		return true, nil
1874	}); err != nil {
1875		t.Fatal(err)
1876	}
1877}
1878
1879func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
1880	czCleanup := channelz.NewChannelzStorage()
1881	defer czCleanupWrapper(czCleanup, t)
1882	e := tcpClearRREnv
1883	// avoid newTest using WithBalancer, which would override service config's change of balancer below.
1884	e.balancer = ""
1885	te := newTest(t, e)
1886	channelz.SetMaxTraceEntry(1)
1887	defer channelz.ResetMaxTraceEntryToDefault()
1888	r, cleanup := manual.GenerateAndRegisterManualResolver()
1889	defer cleanup()
1890	resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
1891	r.InitialState(resolver.State{Addresses: resolvedAddrs})
1892	te.resolverScheme = r.Scheme()
1893	te.clientConn()
1894	defer te.tearDown()
1895	var nestedConn int64
1896	if err := verifyResultWithDelay(func() (bool, error) {
1897		tcs, _ := channelz.GetTopChannels(0, 0)
1898		if len(tcs) != 1 {
1899			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1900		}
1901		if len(tcs[0].NestedChans) != 1 {
1902			return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1903		}
1904		for k := range tcs[0].NestedChans {
1905			nestedConn = k
1906		}
1907		return true, nil
1908	}); err != nil {
1909		t.Fatal(err)
1910	}
1911
1912	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1913
1914	// wait for the shutdown of grpclb balancer
1915	if err := verifyResultWithDelay(func() (bool, error) {
1916		tcs, _ := channelz.GetTopChannels(0, 0)
1917		if len(tcs) != 1 {
1918			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1919		}
1920		if len(tcs[0].NestedChans) != 0 {
1921			return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
1922		}
1923		return true, nil
1924	}); err != nil {
1925		t.Fatal(err)
1926	}
1927
1928	// If nested channel deletion is last trace event before the next validation, it will fail, as the top channel will hold a reference to it.
1929	// This line forces a trace event on the top channel in that case.
1930	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
1931
1932	// verify that the nested channel no longer exist due to trace referencing it got overwritten.
1933	if err := verifyResultWithDelay(func() (bool, error) {
1934		cm := channelz.GetChannel(nestedConn)
1935		if cm != nil {
1936			return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
1937		}
1938		return true, nil
1939	}); err != nil {
1940		t.Fatal(err)
1941	}
1942}
1943
1944func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
1945	czCleanup := channelz.NewChannelzStorage()
1946	defer czCleanupWrapper(czCleanup, t)
1947	e := tcpClearRREnv
1948	te := newTest(t, e)
1949	channelz.SetMaxTraceEntry(1)
1950	defer channelz.ResetMaxTraceEntryToDefault()
1951	te.startServer(&testServer{security: e.security})
1952	r, cleanup := manual.GenerateAndRegisterManualResolver()
1953	defer cleanup()
1954	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1955	te.resolverScheme = r.Scheme()
1956	te.clientConn()
1957	defer te.tearDown()
1958	var subConn int64
1959	// Here, we just wait for all sockets to be up. In the future, if we implement
1960	// IDLE, we may need to make several rpc calls to create the sockets.
1961	if err := verifyResultWithDelay(func() (bool, error) {
1962		tcs, _ := channelz.GetTopChannels(0, 0)
1963		if len(tcs) != 1 {
1964			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1965		}
1966		if len(tcs[0].SubChans) != 1 {
1967			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
1968		}
1969		for k := range tcs[0].SubChans {
1970			subConn = k
1971		}
1972		return true, nil
1973	}); err != nil {
1974		t.Fatal(err)
1975	}
1976
1977	// Wait for ready
1978	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1979	defer cancel()
1980	for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
1981		if !te.cc.WaitForStateChange(ctx, src) {
1982			t.Fatalf("timed out waiting for state change.  got %v; want %v", src, connectivity.Ready)
1983		}
1984	}
1985	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1986	// Wait for not-ready.
1987	for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
1988		if !te.cc.WaitForStateChange(ctx, src) {
1989			t.Fatalf("timed out waiting for state change.  got %v; want !%v", src, connectivity.Ready)
1990		}
1991	}
1992
1993	// verify that the subchannel no longer exist due to trace referencing it got overwritten.
1994	if err := verifyResultWithDelay(func() (bool, error) {
1995		cm := channelz.GetChannel(subConn)
1996		if cm != nil {
1997			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
1998		}
1999		return true, nil
2000	}); err != nil {
2001		t.Fatal(err)
2002	}
2003}
2004
2005func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
2006	czCleanup := channelz.NewChannelzStorage()
2007	defer czCleanupWrapper(czCleanup, t)
2008	e := tcpClearRREnv
2009	te := newTest(t, e)
2010	te.startServer(&testServer{security: e.security})
2011	r, cleanup := manual.GenerateAndRegisterManualResolver()
2012	defer cleanup()
2013	r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
2014	te.resolverScheme = r.Scheme()
2015	te.clientConn()
2016	var subConn int64
2017	// Here, we just wait for all sockets to be up. In the future, if we implement
2018	// IDLE, we may need to make several rpc calls to create the sockets.
2019	if err := verifyResultWithDelay(func() (bool, error) {
2020		tcs, _ := channelz.GetTopChannels(0, 0)
2021		if len(tcs) != 1 {
2022			return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2023		}
2024		if len(tcs[0].SubChans) != 1 {
2025			return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
2026		}
2027		for k := range tcs[0].SubChans {
2028			subConn = k
2029		}
2030		return true, nil
2031	}); err != nil {
2032		t.Fatal(err)
2033	}
2034	te.tearDown()
2035	// verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
2036	if err := verifyResultWithDelay(func() (bool, error) {
2037		cm := channelz.GetChannel(subConn)
2038		if cm != nil {
2039			return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
2040		}
2041		return true, nil
2042	}); err != nil {
2043		t.Fatal(err)
2044	}
2045}
2046