1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package test 20 21import ( 22 "context" 23 "crypto/tls" 24 "fmt" 25 "net" 26 "reflect" 27 "strings" 28 "sync" 29 "testing" 30 "time" 31 32 "golang.org/x/net/http2" 33 "google.golang.org/grpc" 34 _ "google.golang.org/grpc/balancer/grpclb" 35 "google.golang.org/grpc/balancer/roundrobin" 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/connectivity" 38 "google.golang.org/grpc/credentials" 39 "google.golang.org/grpc/internal" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/stubserver" 42 "google.golang.org/grpc/keepalive" 43 "google.golang.org/grpc/resolver" 44 "google.golang.org/grpc/resolver/manual" 45 "google.golang.org/grpc/status" 46 testpb "google.golang.org/grpc/test/grpc_testing" 47 "google.golang.org/grpc/testdata" 48) 49 50func czCleanupWrapper(cleanup func() error, t *testing.T) { 51 if err := cleanup(); err != nil { 52 t.Error(err) 53 } 54} 55 56func verifyResultWithDelay(f func() (bool, error)) error { 57 var ok bool 58 var err error 59 for i := 0; i < 1000; i++ { 60 if ok, err = f(); ok { 61 return nil 62 } 63 time.Sleep(10 * time.Millisecond) 64 } 65 return err 66} 67 68func (s) TestCZServerRegistrationAndDeletion(t *testing.T) { 69 testcases := []struct { 70 total int 71 start int64 72 max int64 73 length int64 74 end bool 75 }{ 76 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 77 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 78 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 79 {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true}, 80 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 81 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 82 } 83 84 for _, c := range testcases { 85 czCleanup := channelz.NewChannelzStorage() 86 defer czCleanupWrapper(czCleanup, t) 87 e := tcpClearRREnv 88 te := newTest(t, e) 89 te.startServers(&testServer{security: e.security}, c.total) 90 91 ss, end := channelz.GetServers(c.start, c.max) 92 if int64(len(ss)) != c.length || end != c.end { 93 t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end) 94 } 95 te.tearDown() 96 ss, end = channelz.GetServers(c.start, c.max) 97 if len(ss) != 0 || !end { 98 t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end) 99 } 100 } 101} 102 103func (s) TestCZGetServer(t *testing.T) { 104 czCleanup := channelz.NewChannelzStorage() 105 defer czCleanupWrapper(czCleanup, t) 106 e := tcpClearRREnv 107 te := newTest(t, e) 108 te.startServer(&testServer{security: e.security}) 109 defer te.tearDown() 110 111 ss, _ := channelz.GetServers(0, 0) 112 if len(ss) != 1 { 113 t.Fatalf("there should only be one server, not %d", len(ss)) 114 } 115 116 serverID := ss[0].ID 117 srv := channelz.GetServer(serverID) 118 if srv == nil { 119 t.Fatalf("server %d does not exist", serverID) 120 } 121 if srv.ID != serverID { 122 t.Fatalf("server want id %d, but got %d", serverID, srv.ID) 123 } 124 125 te.tearDown() 126 127 if err := verifyResultWithDelay(func() (bool, error) { 128 srv := channelz.GetServer(serverID) 129 if srv != nil { 130 return false, fmt.Errorf("server %d should not exist", serverID) 131 } 132 133 return true, nil 134 }); err != nil { 135 t.Fatal(err) 136 } 137} 138 139func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) { 140 testcases := []struct { 141 total int 142 start int64 143 max int64 144 length int64 145 end bool 146 }{ 147 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 148 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 149 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 150 {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true}, 151 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 152 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 153 } 154 155 for _, c := range testcases { 156 czCleanup := channelz.NewChannelzStorage() 157 defer czCleanupWrapper(czCleanup, t) 158 e := tcpClearRREnv 159 te := newTest(t, e) 160 var ccs []*grpc.ClientConn 161 for i := 0; i < c.total; i++ { 162 cc := te.clientConn() 163 te.cc = nil 164 // avoid making next dial blocking 165 te.srvAddr = "" 166 ccs = append(ccs, cc) 167 } 168 if err := verifyResultWithDelay(func() (bool, error) { 169 if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end { 170 return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end) 171 } 172 return true, nil 173 }); err != nil { 174 t.Fatal(err) 175 } 176 177 for _, cc := range ccs { 178 cc.Close() 179 } 180 181 if err := verifyResultWithDelay(func() (bool, error) { 182 if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end { 183 return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end) 184 } 185 return true, nil 186 }); err != nil { 187 t.Fatal(err) 188 } 189 te.tearDown() 190 } 191} 192 193func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) { 194 czCleanup := channelz.NewChannelzStorage() 195 defer czCleanupWrapper(czCleanup, t) 196 // Make dial fails (due to no transport security specified) 197 _, err := grpc.Dial("fake.addr") 198 if err == nil { 199 t.Fatal("expecting dial to fail") 200 } 201 if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end { 202 t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end) 203 } 204} 205 206func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { 207 czCleanup := channelz.NewChannelzStorage() 208 defer czCleanupWrapper(czCleanup, t) 209 e := tcpClearRREnv 210 // avoid calling API to set balancer type, which will void service config's change of balancer. 211 e.balancer = "" 212 te := newTest(t, e) 213 r := manual.NewBuilderWithScheme("whatever") 214 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 215 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 216 te.resolverScheme = r.Scheme() 217 te.clientConn(grpc.WithResolvers(r)) 218 defer te.tearDown() 219 220 if err := verifyResultWithDelay(func() (bool, error) { 221 tcs, _ := channelz.GetTopChannels(0, 0) 222 if len(tcs) != 1 { 223 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 224 } 225 if len(tcs[0].NestedChans) != 1 { 226 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 227 } 228 return true, nil 229 }); err != nil { 230 t.Fatal(err) 231 } 232 233 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 234 235 // wait for the shutdown of grpclb balancer 236 if err := verifyResultWithDelay(func() (bool, error) { 237 tcs, _ := channelz.GetTopChannels(0, 0) 238 if len(tcs) != 1 { 239 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 240 } 241 if len(tcs[0].NestedChans) != 0 { 242 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 243 } 244 return true, nil 245 }); err != nil { 246 t.Fatal(err) 247 } 248} 249 250func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { 251 czCleanup := channelz.NewChannelzStorage() 252 defer czCleanupWrapper(czCleanup, t) 253 e := tcpClearRREnv 254 num := 3 // number of backends 255 te := newTest(t, e) 256 var svrAddrs []resolver.Address 257 te.startServers(&testServer{security: e.security}, num) 258 r := manual.NewBuilderWithScheme("whatever") 259 for _, a := range te.srvAddrs { 260 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 261 } 262 r.InitialState(resolver.State{Addresses: svrAddrs}) 263 te.resolverScheme = r.Scheme() 264 te.clientConn(grpc.WithResolvers(r)) 265 defer te.tearDown() 266 // Here, we just wait for all sockets to be up. In the future, if we implement 267 // IDLE, we may need to make several rpc calls to create the sockets. 268 if err := verifyResultWithDelay(func() (bool, error) { 269 tcs, _ := channelz.GetTopChannels(0, 0) 270 if len(tcs) != 1 { 271 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 272 } 273 if len(tcs[0].SubChans) != num { 274 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) 275 } 276 count := 0 277 for k := range tcs[0].SubChans { 278 sc := channelz.GetSubChannel(k) 279 if sc == nil { 280 return false, fmt.Errorf("got <nil> subchannel") 281 } 282 count += len(sc.Sockets) 283 } 284 if count != num { 285 return false, fmt.Errorf("there should be %d sockets not %d", num, count) 286 } 287 288 return true, nil 289 }); err != nil { 290 t.Fatal(err) 291 } 292 293 r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]}) 294 295 if err := verifyResultWithDelay(func() (bool, error) { 296 tcs, _ := channelz.GetTopChannels(0, 0) 297 if len(tcs) != 1 { 298 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 299 } 300 if len(tcs[0].SubChans) != num-1 { 301 return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans)) 302 } 303 count := 0 304 for k := range tcs[0].SubChans { 305 sc := channelz.GetSubChannel(k) 306 if sc == nil { 307 return false, fmt.Errorf("got <nil> subchannel") 308 } 309 count += len(sc.Sockets) 310 } 311 if count != num-1 { 312 return false, fmt.Errorf("there should be %d sockets not %d", num-1, count) 313 } 314 315 return true, nil 316 }); err != nil { 317 t.Fatal(err) 318 } 319} 320 321func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) { 322 testcases := []struct { 323 total int 324 start int64 325 max int64 326 length int64 327 end bool 328 }{ 329 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 330 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 331 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 332 {total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true}, 333 {total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true}, 334 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 335 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 336 } 337 338 for _, c := range testcases { 339 czCleanup := channelz.NewChannelzStorage() 340 defer czCleanupWrapper(czCleanup, t) 341 e := tcpClearRREnv 342 te := newTest(t, e) 343 te.startServer(&testServer{security: e.security}) 344 var ccs []*grpc.ClientConn 345 for i := 0; i < c.total; i++ { 346 cc := te.clientConn() 347 te.cc = nil 348 ccs = append(ccs, cc) 349 } 350 351 var svrID int64 352 if err := verifyResultWithDelay(func() (bool, error) { 353 ss, _ := channelz.GetServers(0, 0) 354 if len(ss) != 1 { 355 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 356 } 357 if len(ss[0].ListenSockets) != 1 { 358 return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) 359 } 360 361 startID := c.start 362 if startID != 0 { 363 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total)) 364 if int64(len(ns)) < c.start { 365 return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start) 366 } 367 startID = ns[c.start-1].ID + 1 368 } 369 370 ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max) 371 if int64(len(ns)) != c.length || end != c.end { 372 return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end) 373 } 374 375 svrID = ss[0].ID 376 return true, nil 377 }); err != nil { 378 t.Fatal(err) 379 } 380 381 for _, cc := range ccs { 382 cc.Close() 383 } 384 385 if err := verifyResultWithDelay(func() (bool, error) { 386 ns, _ := channelz.GetServerSockets(svrID, c.start, c.max) 387 if len(ns) != 0 { 388 return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns)) 389 } 390 return true, nil 391 }); err != nil { 392 t.Fatal(err) 393 } 394 te.tearDown() 395 } 396} 397 398func (s) TestCZServerListenSocketDeletion(t *testing.T) { 399 czCleanup := channelz.NewChannelzStorage() 400 defer czCleanupWrapper(czCleanup, t) 401 s := grpc.NewServer() 402 lis, err := net.Listen("tcp", "localhost:0") 403 if err != nil { 404 t.Fatalf("failed to listen: %v", err) 405 } 406 go s.Serve(lis) 407 if err := verifyResultWithDelay(func() (bool, error) { 408 ss, _ := channelz.GetServers(0, 0) 409 if len(ss) != 1 { 410 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 411 } 412 if len(ss[0].ListenSockets) != 1 { 413 return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) 414 } 415 return true, nil 416 }); err != nil { 417 t.Fatal(err) 418 } 419 420 lis.Close() 421 if err := verifyResultWithDelay(func() (bool, error) { 422 ss, _ := channelz.GetServers(0, 0) 423 if len(ss) != 1 { 424 return false, fmt.Errorf("there should be 1 server, not %d", len(ss)) 425 } 426 if len(ss[0].ListenSockets) != 0 { 427 return false, fmt.Errorf("there should only be %d server listen socket, not %d", 0, len(ss[0].ListenSockets)) 428 } 429 return true, nil 430 }); err != nil { 431 t.Fatal(err) 432 } 433 s.Stop() 434} 435 436type dummyChannel struct{} 437 438func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 439 return &channelz.ChannelInternalMetric{} 440} 441 442type dummySocket struct{} 443 444func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric { 445 return &channelz.SocketInternalMetric{} 446} 447 448func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { 449 // +--+TopChan+---+ 450 // | | 451 // v v 452 // +-+SubChan1+--+ SubChan2 453 // | | 454 // v v 455 // Socket1 Socket2 456 czCleanup := channelz.NewChannelzStorage() 457 defer czCleanupWrapper(czCleanup, t) 458 topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "") 459 subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") 460 subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") 461 sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") 462 sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") 463 464 tcs, _ := channelz.GetTopChannels(0, 0) 465 if tcs == nil || len(tcs) != 1 { 466 t.Fatalf("There should be one TopChannel entry") 467 } 468 if len(tcs[0].SubChans) != 2 { 469 t.Fatalf("There should be two SubChannel entries") 470 } 471 sc := channelz.GetSubChannel(subChanID1) 472 if sc == nil || len(sc.Sockets) != 2 { 473 t.Fatalf("There should be two Socket entries") 474 } 475 476 channelz.RemoveEntry(topChanID) 477 tcs, _ = channelz.GetTopChannels(0, 0) 478 if tcs == nil || len(tcs) != 1 { 479 t.Fatalf("There should be one TopChannel entry") 480 } 481 482 channelz.RemoveEntry(subChanID1) 483 channelz.RemoveEntry(subChanID2) 484 tcs, _ = channelz.GetTopChannels(0, 0) 485 if tcs == nil || len(tcs) != 1 { 486 t.Fatalf("There should be one TopChannel entry") 487 } 488 if len(tcs[0].SubChans) != 1 { 489 t.Fatalf("There should be one SubChannel entry") 490 } 491 492 channelz.RemoveEntry(sktID1) 493 channelz.RemoveEntry(sktID2) 494 tcs, _ = channelz.GetTopChannels(0, 0) 495 if tcs != nil { 496 t.Fatalf("There should be no TopChannel entry") 497 } 498} 499 500func (s) TestCZChannelMetrics(t *testing.T) { 501 czCleanup := channelz.NewChannelzStorage() 502 defer czCleanupWrapper(czCleanup, t) 503 e := tcpClearRREnv 504 num := 3 // number of backends 505 te := newTest(t, e) 506 te.maxClientSendMsgSize = newInt(8) 507 var svrAddrs []resolver.Address 508 te.startServers(&testServer{security: e.security}, num) 509 r := manual.NewBuilderWithScheme("whatever") 510 for _, a := range te.srvAddrs { 511 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 512 } 513 r.InitialState(resolver.State{Addresses: svrAddrs}) 514 te.resolverScheme = r.Scheme() 515 cc := te.clientConn(grpc.WithResolvers(r)) 516 defer te.tearDown() 517 tc := testpb.NewTestServiceClient(cc) 518 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 519 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 520 } 521 522 const smallSize = 1 523 const largeSize = 8 524 525 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 526 if err != nil { 527 t.Fatal(err) 528 } 529 req := &testpb.SimpleRequest{ 530 ResponseType: testpb.PayloadType_COMPRESSABLE, 531 ResponseSize: int32(smallSize), 532 Payload: largePayload, 533 } 534 535 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 536 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 537 } 538 539 stream, err := tc.FullDuplexCall(context.Background()) 540 if err != nil { 541 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 542 } 543 defer stream.CloseSend() 544 // Here, we just wait for all sockets to be up. In the future, if we implement 545 // IDLE, we may need to make several rpc calls to create the sockets. 546 if err := verifyResultWithDelay(func() (bool, error) { 547 tcs, _ := channelz.GetTopChannels(0, 0) 548 if len(tcs) != 1 { 549 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 550 } 551 if len(tcs[0].SubChans) != num { 552 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) 553 } 554 var cst, csu, cf int64 555 for k := range tcs[0].SubChans { 556 sc := channelz.GetSubChannel(k) 557 if sc == nil { 558 return false, fmt.Errorf("got <nil> subchannel") 559 } 560 cst += sc.ChannelData.CallsStarted 561 csu += sc.ChannelData.CallsSucceeded 562 cf += sc.ChannelData.CallsFailed 563 } 564 if cst != 3 { 565 return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst) 566 } 567 if csu != 1 { 568 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu) 569 } 570 if cf != 1 { 571 return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf) 572 } 573 if tcs[0].ChannelData.CallsStarted != 3 { 574 return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted) 575 } 576 if tcs[0].ChannelData.CallsSucceeded != 1 { 577 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded) 578 } 579 if tcs[0].ChannelData.CallsFailed != 1 { 580 return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed) 581 } 582 return true, nil 583 }); err != nil { 584 t.Fatal(err) 585 } 586} 587 588func (s) TestCZServerMetrics(t *testing.T) { 589 czCleanup := channelz.NewChannelzStorage() 590 defer czCleanupWrapper(czCleanup, t) 591 e := tcpClearRREnv 592 te := newTest(t, e) 593 te.maxServerReceiveMsgSize = newInt(8) 594 te.startServer(&testServer{security: e.security}) 595 defer te.tearDown() 596 cc := te.clientConn() 597 tc := testpb.NewTestServiceClient(cc) 598 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 599 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 600 } 601 602 const smallSize = 1 603 const largeSize = 8 604 605 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 606 if err != nil { 607 t.Fatal(err) 608 } 609 req := &testpb.SimpleRequest{ 610 ResponseType: testpb.PayloadType_COMPRESSABLE, 611 ResponseSize: int32(smallSize), 612 Payload: largePayload, 613 } 614 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 615 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 616 } 617 618 stream, err := tc.FullDuplexCall(context.Background()) 619 if err != nil { 620 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 621 } 622 defer stream.CloseSend() 623 624 if err := verifyResultWithDelay(func() (bool, error) { 625 ss, _ := channelz.GetServers(0, 0) 626 if len(ss) != 1 { 627 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 628 } 629 if ss[0].ServerData.CallsStarted != 3 { 630 return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted) 631 } 632 if ss[0].ServerData.CallsSucceeded != 1 { 633 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded) 634 } 635 if ss[0].ServerData.CallsFailed != 1 { 636 return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed) 637 } 638 return true, nil 639 }); err != nil { 640 t.Fatal(err) 641 } 642} 643 644type testServiceClientWrapper struct { 645 testpb.TestServiceClient 646 mu sync.RWMutex 647 streamsCreated int 648} 649 650func (t *testServiceClientWrapper) getCurrentStreamID() uint32 { 651 t.mu.RLock() 652 defer t.mu.RUnlock() 653 return uint32(2*t.streamsCreated - 1) 654} 655 656func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) { 657 t.mu.Lock() 658 defer t.mu.Unlock() 659 t.streamsCreated++ 660 return t.TestServiceClient.EmptyCall(ctx, in, opts...) 661} 662 663func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) { 664 t.mu.Lock() 665 defer t.mu.Unlock() 666 t.streamsCreated++ 667 return t.TestServiceClient.UnaryCall(ctx, in, opts...) 668} 669 670func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) { 671 t.mu.Lock() 672 defer t.mu.Unlock() 673 t.streamsCreated++ 674 return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...) 675} 676 677func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) { 678 t.mu.Lock() 679 defer t.mu.Unlock() 680 t.streamsCreated++ 681 return t.TestServiceClient.StreamingInputCall(ctx, opts...) 682} 683 684func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) { 685 t.mu.Lock() 686 defer t.mu.Unlock() 687 t.streamsCreated++ 688 return t.TestServiceClient.FullDuplexCall(ctx, opts...) 689} 690 691func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) { 692 t.mu.Lock() 693 defer t.mu.Unlock() 694 t.streamsCreated++ 695 return t.TestServiceClient.HalfDuplexCall(ctx, opts...) 696} 697 698func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) { 699 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 700 defer cancel() 701 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 702 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 703 } 704} 705 706func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) { 707 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 708 defer cancel() 709 s, err := tc.StreamingInputCall(ctx) 710 if err != nil { 711 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err) 712 } 713 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000) 714 if err != nil { 715 t.Fatal(err) 716 } 717 s.Send(&testpb.StreamingInputCallRequest{Payload: payload}) 718} 719 720func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) { 721 const smallSize = 1 722 const largeSize = 2000 723 724 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 725 if err != nil { 726 t.Fatal(err) 727 } 728 req := &testpb.SimpleRequest{ 729 ResponseType: testpb.PayloadType_COMPRESSABLE, 730 ResponseSize: int32(smallSize), 731 Payload: largePayload, 732 } 733 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 734 defer cancel() 735 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { 736 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 737 } 738} 739 740func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) { 741 ctx, cancel := context.WithCancel(context.Background()) 742 stream, err := tc.FullDuplexCall(ctx) 743 if err != nil { 744 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 745 } 746 747 const smallSize = 1 748 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 749 if err != nil { 750 t.Fatal(err) 751 } 752 753 sreq := &testpb.StreamingOutputCallRequest{ 754 ResponseType: testpb.PayloadType_COMPRESSABLE, 755 ResponseParameters: []*testpb.ResponseParameters{ 756 {Size: smallSize}, 757 }, 758 Payload: smallPayload, 759 } 760 761 if err := stream.Send(sreq); err != nil { 762 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 763 } 764 if _, err := stream.Recv(); err != nil { 765 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 766 } 767 // By canceling the call, the client will send rst_stream to end the call, and 768 // the stream will failed as a result. 769 cancel() 770} 771 772// This func is to be used to test client side counting of failed streams. 773func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) { 774 stream, err := tc.FullDuplexCall(context.Background()) 775 if err != nil { 776 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 777 } 778 779 const smallSize = 1 780 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 781 if err != nil { 782 t.Fatal(err) 783 } 784 785 sreq := &testpb.StreamingOutputCallRequest{ 786 ResponseType: testpb.PayloadType_COMPRESSABLE, 787 ResponseParameters: []*testpb.ResponseParameters{ 788 {Size: smallSize}, 789 }, 790 Payload: smallPayload, 791 } 792 793 if err := stream.Send(sreq); err != nil { 794 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 795 } 796 if _, err := stream.Recv(); err != nil { 797 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 798 } 799 800 rcw := l.getLastConn() 801 802 if rcw != nil { 803 rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel) 804 } 805 if _, err := stream.Recv(); err == nil { 806 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) 807 } 808} 809 810// this func is to be used to test client side counting of failed streams. 811func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) { 812 // This call is just to keep the transport from shutting down (socket will be deleted 813 // in this case, and we will not be able to get metrics). 814 s, err := tc.FullDuplexCall(context.Background()) 815 if err != nil { 816 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 817 } 818 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{ 819 { 820 Size: 1, 821 }, 822 }}); err != nil { 823 t.Fatalf("s.Send() failed with error: %v", err) 824 } 825 if _, err := s.Recv(); err != nil { 826 t.Fatalf("s.Recv() failed with error: %v", err) 827 } 828 829 s, err = tc.FullDuplexCall(context.Background()) 830 if err != nil { 831 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 832 } 833 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{ 834 { 835 Size: 1, 836 }, 837 }}); err != nil { 838 t.Fatalf("s.Send() failed with error: %v", err) 839 } 840 if _, err := s.Recv(); err != nil { 841 t.Fatalf("s.Recv() failed with error: %v", err) 842 } 843 844 rcw := l.getLastConn() 845 if rcw != nil { 846 rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{}) 847 } 848 if _, err := s.Recv(); err == nil { 849 t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err) 850 } 851} 852 853func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) { 854 ctx, cancel := context.WithCancel(context.Background()) 855 _, err := tc.FullDuplexCall(ctx) 856 if err != nil { 857 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 858 } 859 // Allow for at least 2 keepalives (1s per ping interval) 860 time.Sleep(4 * time.Second) 861 cancel() 862} 863 864func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { 865 czCleanup := channelz.NewChannelzStorage() 866 defer czCleanupWrapper(czCleanup, t) 867 e := tcpClearRREnv 868 te := newTest(t, e) 869 te.maxServerReceiveMsgSize = newInt(20) 870 te.maxClientReceiveMsgSize = newInt(20) 871 rcw := te.startServerWithConnControl(&testServer{security: e.security}) 872 defer te.tearDown() 873 cc := te.clientConn() 874 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 875 876 doSuccessfulUnaryCall(tc, t) 877 var scID, skID int64 878 if err := verifyResultWithDelay(func() (bool, error) { 879 tchan, _ := channelz.GetTopChannels(0, 0) 880 if len(tchan) != 1 { 881 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 882 } 883 if len(tchan[0].SubChans) != 1 { 884 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 885 } 886 887 for scID = range tchan[0].SubChans { 888 break 889 } 890 sc := channelz.GetSubChannel(scID) 891 if sc == nil { 892 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID) 893 } 894 if len(sc.Sockets) != 1 { 895 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 896 } 897 for skID = range sc.Sockets { 898 break 899 } 900 skt := channelz.GetSocket(skID) 901 sktData := skt.SocketData 902 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 903 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived) 904 } 905 return true, nil 906 }); err != nil { 907 t.Fatal(err) 908 } 909 910 doServerSideFailedUnaryCall(tc, t) 911 if err := verifyResultWithDelay(func() (bool, error) { 912 skt := channelz.GetSocket(skID) 913 sktData := skt.SocketData 914 if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 { 915 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived) 916 } 917 return true, nil 918 }); err != nil { 919 t.Fatal(err) 920 } 921 922 doClientSideInitiatedFailedStream(tc, t) 923 if err := verifyResultWithDelay(func() (bool, error) { 924 skt := channelz.GetSocket(skID) 925 sktData := skt.SocketData 926 if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 { 927 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 928 } 929 return true, nil 930 }); err != nil { 931 t.Fatal(err) 932 } 933 934 doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw) 935 if err := verifyResultWithDelay(func() (bool, error) { 936 skt := channelz.GetSocket(skID) 937 sktData := skt.SocketData 938 if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 { 939 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 940 } 941 return true, nil 942 }); err != nil { 943 t.Fatal(err) 944 } 945 946 doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw) 947 if err := verifyResultWithDelay(func() (bool, error) { 948 skt := channelz.GetSocket(skID) 949 sktData := skt.SocketData 950 if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 { 951 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 952 } 953 return true, nil 954 }); err != nil { 955 t.Fatal(err) 956 } 957} 958 959// This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and 960// TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of 961// server sending RST_STREAM to client due to client side flow control violation. 962// It is separated from other cases due to setup incompatibly, i.e. max receive 963// size violation will mask flow control violation. 964func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) { 965 czCleanup := channelz.NewChannelzStorage() 966 defer czCleanupWrapper(czCleanup, t) 967 e := tcpClearRREnv 968 te := newTest(t, e) 969 te.serverInitialWindowSize = 65536 970 // Avoid overflowing connection level flow control window, which will lead to 971 // transport being closed. 972 te.serverInitialConnWindowSize = 65536 * 2 973 ts := &stubserver.StubServer{FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { 974 stream.Send(&testpb.StreamingOutputCallResponse{}) 975 <-stream.Context().Done() 976 return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled") 977 }} 978 te.startServer(ts) 979 defer te.tearDown() 980 cc, dw := te.clientConnWithConnControl() 981 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 982 983 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 984 stream, err := tc.FullDuplexCall(ctx) 985 if err != nil { 986 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 987 } 988 if _, err := stream.Recv(); err != nil { 989 t.Fatalf("stream.Recv() = %v, want nil", err) 990 } 991 go func() { 992 payload := make([]byte, 16384) 993 for i := 0; i < 6; i++ { 994 dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload) 995 } 996 }() 997 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { 998 t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted) 999 } 1000 cancel() 1001 1002 if err := verifyResultWithDelay(func() (bool, error) { 1003 tchan, _ := channelz.GetTopChannels(0, 0) 1004 if len(tchan) != 1 { 1005 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1006 } 1007 if len(tchan[0].SubChans) != 1 { 1008 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1009 } 1010 var id int64 1011 for id = range tchan[0].SubChans { 1012 break 1013 } 1014 sc := channelz.GetSubChannel(id) 1015 if sc == nil { 1016 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1017 } 1018 if len(sc.Sockets) != 1 { 1019 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1020 } 1021 for id = range sc.Sockets { 1022 break 1023 } 1024 skt := channelz.GetSocket(id) 1025 sktData := skt.SocketData 1026 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 { 1027 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) 1028 } 1029 ss, _ := channelz.GetServers(0, 0) 1030 if len(ss) != 1 { 1031 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1032 } 1033 1034 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1035 if len(ns) != 1 { 1036 return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) 1037 } 1038 sktData = ns[0].SocketData 1039 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 { 1040 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) 1041 } 1042 return true, nil 1043 }); err != nil { 1044 t.Fatal(err) 1045 } 1046} 1047 1048func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { 1049 czCleanup := channelz.NewChannelzStorage() 1050 defer czCleanupWrapper(czCleanup, t) 1051 e := tcpClearRREnv 1052 te := newTest(t, e) 1053 // disable BDP 1054 te.serverInitialWindowSize = 65536 1055 te.serverInitialConnWindowSize = 65536 1056 te.clientInitialWindowSize = 65536 1057 te.clientInitialConnWindowSize = 65536 1058 te.startServer(&testServer{security: e.security}) 1059 defer te.tearDown() 1060 cc := te.clientConn() 1061 tc := testpb.NewTestServiceClient(cc) 1062 1063 for i := 0; i < 10; i++ { 1064 doSuccessfulUnaryCall(tc, t) 1065 } 1066 1067 var cliSktID, svrSktID int64 1068 if err := verifyResultWithDelay(func() (bool, error) { 1069 tchan, _ := channelz.GetTopChannels(0, 0) 1070 if len(tchan) != 1 { 1071 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1072 } 1073 if len(tchan[0].SubChans) != 1 { 1074 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1075 } 1076 var id int64 1077 for id = range tchan[0].SubChans { 1078 break 1079 } 1080 sc := channelz.GetSubChannel(id) 1081 if sc == nil { 1082 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1083 } 1084 if len(sc.Sockets) != 1 { 1085 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1086 } 1087 for id = range sc.Sockets { 1088 break 1089 } 1090 skt := channelz.GetSocket(id) 1091 sktData := skt.SocketData 1092 // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1093 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { 1094 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1095 } 1096 ss, _ := channelz.GetServers(0, 0) 1097 if len(ss) != 1 { 1098 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1099 } 1100 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1101 sktData = ns[0].SocketData 1102 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { 1103 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1104 } 1105 cliSktID, svrSktID = id, ss[0].ID 1106 return true, nil 1107 }); err != nil { 1108 t.Fatal(err) 1109 } 1110 1111 doStreamingInputCallWithLargePayload(tc, t) 1112 1113 if err := verifyResultWithDelay(func() (bool, error) { 1114 skt := channelz.GetSocket(cliSktID) 1115 sktData := skt.SocketData 1116 // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1117 // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475 1118 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 { 1119 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1120 } 1121 ss, _ := channelz.GetServers(0, 0) 1122 if len(ss) != 1 { 1123 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1124 } 1125 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0) 1126 sktData = ns[0].SocketData 1127 if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 { 1128 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1129 } 1130 return true, nil 1131 }); err != nil { 1132 t.Fatal(err) 1133 } 1134 1135 // triggers transport flow control window update on server side, since unacked 1136 // bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4. 1137 doStreamingInputCallWithLargePayload(tc, t) 1138 if err := verifyResultWithDelay(func() (bool, error) { 1139 skt := channelz.GetSocket(cliSktID) 1140 sktData := skt.SocketData 1141 // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1142 // Remote: 65536 1143 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 { 1144 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1145 } 1146 ss, _ := channelz.GetServers(0, 0) 1147 if len(ss) != 1 { 1148 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1149 } 1150 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0) 1151 sktData = ns[0].SocketData 1152 if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 { 1153 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1154 } 1155 return true, nil 1156 }); err != nil { 1157 t.Fatal(err) 1158 } 1159} 1160 1161func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { 1162 czCleanup := channelz.NewChannelzStorage() 1163 defer czCleanupWrapper(czCleanup, t) 1164 defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime) 1165 internal.KeepaliveMinPingTime = time.Second 1166 e := tcpClearRREnv 1167 te := newTest(t, e) 1168 te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams( 1169 keepalive.ClientParameters{ 1170 Time: time.Second, 1171 Timeout: 500 * time.Millisecond, 1172 PermitWithoutStream: true, 1173 })) 1174 te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy( 1175 keepalive.EnforcementPolicy{ 1176 MinTime: 500 * time.Millisecond, 1177 PermitWithoutStream: true, 1178 })) 1179 te.startServer(&testServer{security: e.security}) 1180 te.clientConn() // Dial the server 1181 defer te.tearDown() 1182 if err := verifyResultWithDelay(func() (bool, error) { 1183 tchan, _ := channelz.GetTopChannels(0, 0) 1184 if len(tchan) != 1 { 1185 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1186 } 1187 if len(tchan[0].SubChans) != 1 { 1188 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1189 } 1190 var id int64 1191 for id = range tchan[0].SubChans { 1192 break 1193 } 1194 sc := channelz.GetSubChannel(id) 1195 if sc == nil { 1196 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1197 } 1198 if len(sc.Sockets) != 1 { 1199 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1200 } 1201 for id = range sc.Sockets { 1202 break 1203 } 1204 skt := channelz.GetSocket(id) 1205 if skt.SocketData.KeepAlivesSent != 2 { 1206 return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) 1207 } 1208 return true, nil 1209 }); err != nil { 1210 t.Fatal(err) 1211 } 1212} 1213 1214func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { 1215 czCleanup := channelz.NewChannelzStorage() 1216 defer czCleanupWrapper(czCleanup, t) 1217 e := tcpClearRREnv 1218 te := newTest(t, e) 1219 te.maxServerReceiveMsgSize = newInt(20) 1220 te.maxClientReceiveMsgSize = newInt(20) 1221 te.startServer(&testServer{security: e.security}) 1222 defer te.tearDown() 1223 cc, _ := te.clientConnWithConnControl() 1224 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 1225 1226 var svrID int64 1227 if err := verifyResultWithDelay(func() (bool, error) { 1228 ss, _ := channelz.GetServers(0, 0) 1229 if len(ss) != 1 { 1230 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1231 } 1232 svrID = ss[0].ID 1233 return true, nil 1234 }); err != nil { 1235 t.Fatal(err) 1236 } 1237 1238 doSuccessfulUnaryCall(tc, t) 1239 if err := verifyResultWithDelay(func() (bool, error) { 1240 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1241 sktData := ns[0].SocketData 1242 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 1243 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1244 } 1245 return true, nil 1246 }); err != nil { 1247 t.Fatal(err) 1248 } 1249 1250 doServerSideFailedUnaryCall(tc, t) 1251 if err := verifyResultWithDelay(func() (bool, error) { 1252 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1253 sktData := ns[0].SocketData 1254 if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 1255 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1256 } 1257 return true, nil 1258 }); err != nil { 1259 t.Fatal(err) 1260 } 1261 1262 doClientSideInitiatedFailedStream(tc, t) 1263 if err := verifyResultWithDelay(func() (bool, error) { 1264 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1265 sktData := ns[0].SocketData 1266 if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 { 1267 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1268 } 1269 return true, nil 1270 }); err != nil { 1271 t.Fatal(err) 1272 } 1273} 1274 1275func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) { 1276 czCleanup := channelz.NewChannelzStorage() 1277 defer czCleanupWrapper(czCleanup, t) 1278 e := tcpClearRREnv 1279 te := newTest(t, e) 1280 // We setup the server keepalive parameters to send one keepalive every 1281 // second, and verify that the actual number of keepalives is very close to 1282 // the number of seconds elapsed in the test. We had a bug wherein the 1283 // server was sending one keepalive every [Time+Timeout] instead of every 1284 // [Time] period, and since Timeout is configured to a low value here, we 1285 // should be able to verify that the fix works with the above mentioned 1286 // logic. 1287 kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{ 1288 Time: time.Second, 1289 Timeout: 100 * time.Millisecond, 1290 }) 1291 te.customServerOptions = append(te.customServerOptions, kpOption) 1292 te.startServer(&testServer{security: e.security}) 1293 defer te.tearDown() 1294 cc := te.clientConn() 1295 tc := testpb.NewTestServiceClient(cc) 1296 start := time.Now() 1297 doIdleCallToInvokeKeepAlive(tc, t) 1298 1299 if err := verifyResultWithDelay(func() (bool, error) { 1300 ss, _ := channelz.GetServers(0, 0) 1301 if len(ss) != 1 { 1302 return false, fmt.Errorf("there should be one server, not %d", len(ss)) 1303 } 1304 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1305 if len(ns) != 1 { 1306 return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) 1307 } 1308 wantKeepalivesCount := int64(time.Since(start).Seconds()) - 1 1309 if gotKeepalivesCount := ns[0].SocketData.KeepAlivesSent; gotKeepalivesCount != wantKeepalivesCount { 1310 return false, fmt.Errorf("got keepalivesCount: %v, want keepalivesCount: %v", gotKeepalivesCount, wantKeepalivesCount) 1311 } 1312 return true, nil 1313 }); err != nil { 1314 t.Fatal(err) 1315 } 1316} 1317 1318var cipherSuites = []string{ 1319 "TLS_RSA_WITH_RC4_128_SHA", 1320 "TLS_RSA_WITH_3DES_EDE_CBC_SHA", 1321 "TLS_RSA_WITH_AES_128_CBC_SHA", 1322 "TLS_RSA_WITH_AES_256_CBC_SHA", 1323 "TLS_RSA_WITH_AES_128_GCM_SHA256", 1324 "TLS_RSA_WITH_AES_256_GCM_SHA384", 1325 "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA", 1326 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", 1327 "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA", 1328 "TLS_ECDHE_RSA_WITH_RC4_128_SHA", 1329 "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA", 1330 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", 1331 "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA", 1332 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", 1333 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", 1334 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", 1335 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", 1336 "TLS_FALLBACK_SCSV", 1337 "TLS_RSA_WITH_AES_128_CBC_SHA256", 1338 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", 1339 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", 1340 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", 1341 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", 1342 "TLS_AES_128_GCM_SHA256", 1343 "TLS_AES_256_GCM_SHA384", 1344 "TLS_CHACHA20_POLY1305_SHA256", 1345} 1346 1347func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { 1348 czCleanup := channelz.NewChannelzStorage() 1349 defer czCleanupWrapper(czCleanup, t) 1350 e := tcpTLSRREnv 1351 te := newTest(t, e) 1352 te.startServer(&testServer{security: e.security}) 1353 defer te.tearDown() 1354 te.clientConn() 1355 if err := verifyResultWithDelay(func() (bool, error) { 1356 tchan, _ := channelz.GetTopChannels(0, 0) 1357 if len(tchan) != 1 { 1358 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1359 } 1360 if len(tchan[0].SubChans) != 1 { 1361 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1362 } 1363 var id int64 1364 for id = range tchan[0].SubChans { 1365 break 1366 } 1367 sc := channelz.GetSubChannel(id) 1368 if sc == nil { 1369 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1370 } 1371 if len(sc.Sockets) != 1 { 1372 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1373 } 1374 for id = range sc.Sockets { 1375 break 1376 } 1377 skt := channelz.GetSocket(id) 1378 cert, _ := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) 1379 securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue) 1380 if !ok { 1381 return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security) 1382 } 1383 if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) { 1384 return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0]) 1385 } 1386 for _, v := range cipherSuites { 1387 if v == securityVal.StandardName { 1388 return true, nil 1389 } 1390 } 1391 return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites) 1392 }); err != nil { 1393 t.Fatal(err) 1394 } 1395} 1396 1397func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { 1398 czCleanup := channelz.NewChannelzStorage() 1399 defer czCleanupWrapper(czCleanup, t) 1400 e := tcpClearRREnv 1401 // avoid calling API to set balancer type, which will void service config's change of balancer. 1402 e.balancer = "" 1403 te := newTest(t, e) 1404 r := manual.NewBuilderWithScheme("whatever") 1405 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 1406 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 1407 te.resolverScheme = r.Scheme() 1408 te.clientConn(grpc.WithResolvers(r)) 1409 defer te.tearDown() 1410 var nestedConn int64 1411 if err := verifyResultWithDelay(func() (bool, error) { 1412 tcs, _ := channelz.GetTopChannels(0, 0) 1413 if len(tcs) != 1 { 1414 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1415 } 1416 if len(tcs[0].NestedChans) != 1 { 1417 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1418 } 1419 for k := range tcs[0].NestedChans { 1420 nestedConn = k 1421 } 1422 for _, e := range tcs[0].Trace.Events { 1423 if e.RefID == nestedConn && e.RefType != channelz.RefChannel { 1424 return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType") 1425 } 1426 } 1427 ncm := channelz.GetChannel(nestedConn) 1428 if ncm.Trace == nil { 1429 return false, fmt.Errorf("trace for nested channel should not be empty") 1430 } 1431 if len(ncm.Trace.Events) == 0 { 1432 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") 1433 } 1434 if ncm.Trace.Events[0].Desc != "Channel Created" { 1435 return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc) 1436 } 1437 return true, nil 1438 }); err != nil { 1439 t.Fatal(err) 1440 } 1441 1442 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1443 1444 // wait for the shutdown of grpclb balancer 1445 if err := verifyResultWithDelay(func() (bool, error) { 1446 tcs, _ := channelz.GetTopChannels(0, 0) 1447 if len(tcs) != 1 { 1448 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1449 } 1450 if len(tcs[0].NestedChans) != 0 { 1451 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1452 } 1453 ncm := channelz.GetChannel(nestedConn) 1454 if ncm == nil { 1455 return false, fmt.Errorf("nested channel should still exist due to parent's trace reference") 1456 } 1457 if ncm.Trace == nil { 1458 return false, fmt.Errorf("trace for nested channel should not be empty") 1459 } 1460 if len(ncm.Trace.Events) == 0 { 1461 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") 1462 } 1463 if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" { 1464 return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc) 1465 } 1466 return true, nil 1467 }); err != nil { 1468 t.Fatal(err) 1469 } 1470} 1471 1472func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { 1473 czCleanup := channelz.NewChannelzStorage() 1474 defer czCleanupWrapper(czCleanup, t) 1475 e := tcpClearRREnv 1476 te := newTest(t, e) 1477 te.startServer(&testServer{security: e.security}) 1478 r := manual.NewBuilderWithScheme("whatever") 1479 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1480 te.resolverScheme = r.Scheme() 1481 te.clientConn(grpc.WithResolvers(r)) 1482 defer te.tearDown() 1483 var subConn int64 1484 // Here, we just wait for all sockets to be up. In the future, if we implement 1485 // IDLE, we may need to make several rpc calls to create the sockets. 1486 if err := verifyResultWithDelay(func() (bool, error) { 1487 tcs, _ := channelz.GetTopChannels(0, 0) 1488 if len(tcs) != 1 { 1489 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1490 } 1491 if len(tcs[0].SubChans) != 1 { 1492 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1493 } 1494 for k := range tcs[0].SubChans { 1495 subConn = k 1496 } 1497 for _, e := range tcs[0].Trace.Events { 1498 if e.RefID == subConn && e.RefType != channelz.RefSubChannel { 1499 return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel") 1500 } 1501 } 1502 scm := channelz.GetSubChannel(subConn) 1503 if scm == nil { 1504 return false, fmt.Errorf("subChannel does not exist") 1505 } 1506 if scm.Trace == nil { 1507 return false, fmt.Errorf("trace for subChannel should not be empty") 1508 } 1509 if len(scm.Trace.Events) == 0 { 1510 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1511 } 1512 if scm.Trace.Events[0].Desc != "Subchannel Created" { 1513 return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc) 1514 } 1515 return true, nil 1516 }); err != nil { 1517 t.Fatal(err) 1518 } 1519 1520 // Wait for ready 1521 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1522 defer cancel() 1523 for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { 1524 if !te.cc.WaitForStateChange(ctx, src) { 1525 t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) 1526 } 1527 } 1528 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1529 // Wait for not-ready. 1530 for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { 1531 if !te.cc.WaitForStateChange(ctx, src) { 1532 t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) 1533 } 1534 } 1535 1536 if err := verifyResultWithDelay(func() (bool, error) { 1537 tcs, _ := channelz.GetTopChannels(0, 0) 1538 if len(tcs) != 1 { 1539 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1540 } 1541 if len(tcs[0].SubChans) != 1 { 1542 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1543 } 1544 scm := channelz.GetSubChannel(subConn) 1545 if scm == nil { 1546 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") 1547 } 1548 if scm.Trace == nil { 1549 return false, fmt.Errorf("trace for SubChannel should not be empty") 1550 } 1551 if len(scm.Trace.Events) == 0 { 1552 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1553 } 1554 if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want { 1555 return false, fmt.Errorf("the last trace event should be %q, not %q", want, got) 1556 } 1557 1558 return true, nil 1559 }); err != nil { 1560 t.Fatal(err) 1561 } 1562} 1563 1564func (s) TestCZChannelAddressResolutionChange(t *testing.T) { 1565 czCleanup := channelz.NewChannelzStorage() 1566 defer czCleanupWrapper(czCleanup, t) 1567 e := tcpClearRREnv 1568 e.balancer = "" 1569 te := newTest(t, e) 1570 te.startServer(&testServer{security: e.security}) 1571 r := manual.NewBuilderWithScheme("whatever") 1572 addrs := []resolver.Address{{Addr: te.srvAddr}} 1573 r.InitialState(resolver.State{Addresses: addrs}) 1574 te.resolverScheme = r.Scheme() 1575 te.clientConn(grpc.WithResolvers(r)) 1576 defer te.tearDown() 1577 var cid int64 1578 // Here, we just wait for all sockets to be up. In the future, if we implement 1579 // IDLE, we may need to make several rpc calls to create the sockets. 1580 if err := verifyResultWithDelay(func() (bool, error) { 1581 tcs, _ := channelz.GetTopChannels(0, 0) 1582 if len(tcs) != 1 { 1583 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1584 } 1585 cid = tcs[0].ID 1586 for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- { 1587 if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") { 1588 break 1589 } 1590 if i == 0 { 1591 return false, fmt.Errorf("events do not contain expected address resolution from empty address state. Got: %+v", tcs[0].Trace.Events) 1592 } 1593 } 1594 return true, nil 1595 }); err != nil { 1596 t.Fatal(err) 1597 } 1598 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1599 1600 if err := verifyResultWithDelay(func() (bool, error) { 1601 cm := channelz.GetChannel(cid) 1602 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1603 if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) { 1604 break 1605 } 1606 if i == 0 { 1607 return false, fmt.Errorf("events do not contain expected address resolution change of LB policy") 1608 } 1609 } 1610 return true, nil 1611 }); err != nil { 1612 t.Fatal(err) 1613 } 1614 1615 newSC := parseCfg(r, `{ 1616 "methodConfig": [ 1617 { 1618 "name": [ 1619 { 1620 "service": "grpc.testing.TestService", 1621 "method": "EmptyCall" 1622 } 1623 ], 1624 "waitForReady": false, 1625 "timeout": ".001s" 1626 } 1627 ] 1628}`) 1629 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC}) 1630 1631 if err := verifyResultWithDelay(func() (bool, error) { 1632 cm := channelz.GetChannel(cid) 1633 1634 var es []string 1635 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1636 if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") { 1637 break 1638 } 1639 es = append(es, cm.Trace.Events[i].Desc) 1640 if i == 0 { 1641 return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n")) 1642 } 1643 } 1644 return true, nil 1645 }); err != nil { 1646 t.Fatal(err) 1647 } 1648 1649 r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC}) 1650 1651 if err := verifyResultWithDelay(func() (bool, error) { 1652 cm := channelz.GetChannel(cid) 1653 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1654 if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") { 1655 break 1656 } 1657 if i == 0 { 1658 return false, fmt.Errorf("events do not contain expected address resolution of empty address") 1659 } 1660 } 1661 return true, nil 1662 }); err != nil { 1663 t.Fatal(err) 1664 } 1665} 1666 1667func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { 1668 czCleanup := channelz.NewChannelzStorage() 1669 defer czCleanupWrapper(czCleanup, t) 1670 e := tcpClearRREnv 1671 e.balancer = "" 1672 te := newTest(t, e) 1673 te.startServers(&testServer{security: e.security}, 3) 1674 r := manual.NewBuilderWithScheme("whatever") 1675 var svrAddrs []resolver.Address 1676 for _, a := range te.srvAddrs { 1677 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 1678 } 1679 r.InitialState(resolver.State{Addresses: svrAddrs}) 1680 te.resolverScheme = r.Scheme() 1681 cc := te.clientConn(grpc.WithResolvers(r)) 1682 defer te.tearDown() 1683 tc := testpb.NewTestServiceClient(cc) 1684 // make sure the connection is up 1685 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1686 defer cancel() 1687 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1688 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1689 } 1690 te.srvs[0].Stop() 1691 te.srvs[1].Stop() 1692 // Here, we just wait for all sockets to be up. In the future, if we implement 1693 // IDLE, we may need to make several rpc calls to create the sockets. 1694 if err := verifyResultWithDelay(func() (bool, error) { 1695 tcs, _ := channelz.GetTopChannels(0, 0) 1696 if len(tcs) != 1 { 1697 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1698 } 1699 if len(tcs[0].SubChans) != 1 { 1700 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1701 } 1702 var subConn int64 1703 for k := range tcs[0].SubChans { 1704 subConn = k 1705 } 1706 scm := channelz.GetSubChannel(subConn) 1707 if scm.Trace == nil { 1708 return false, fmt.Errorf("trace for SubChannel should not be empty") 1709 } 1710 if len(scm.Trace.Events) == 0 { 1711 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1712 } 1713 for i := len(scm.Trace.Events) - 1; i >= 0; i-- { 1714 if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) { 1715 break 1716 } 1717 if i == 0 { 1718 return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address") 1719 } 1720 } 1721 return true, nil 1722 }); err != nil { 1723 t.Fatal(err) 1724 } 1725} 1726 1727func (s) TestCZSubChannelConnectivityState(t *testing.T) { 1728 czCleanup := channelz.NewChannelzStorage() 1729 defer czCleanupWrapper(czCleanup, t) 1730 e := tcpClearRREnv 1731 te := newTest(t, e) 1732 te.startServer(&testServer{security: e.security}) 1733 r := manual.NewBuilderWithScheme("whatever") 1734 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1735 te.resolverScheme = r.Scheme() 1736 cc := te.clientConn(grpc.WithResolvers(r)) 1737 defer te.tearDown() 1738 tc := testpb.NewTestServiceClient(cc) 1739 // make sure the connection is up 1740 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1741 defer cancel() 1742 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1743 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1744 } 1745 var subConn int64 1746 te.srv.Stop() 1747 1748 if err := verifyResultWithDelay(func() (bool, error) { 1749 // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due 1750 // to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}})) 1751 if subConn == 0 { 1752 tcs, _ := channelz.GetTopChannels(0, 0) 1753 if len(tcs) != 1 { 1754 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1755 } 1756 if len(tcs[0].SubChans) != 1 { 1757 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1758 } 1759 for k := range tcs[0].SubChans { 1760 // get the SubChannel id for further trace inquiry. 1761 subConn = k 1762 } 1763 } 1764 scm := channelz.GetSubChannel(subConn) 1765 if scm == nil { 1766 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") 1767 } 1768 if scm.Trace == nil { 1769 return false, fmt.Errorf("trace for SubChannel should not be empty") 1770 } 1771 if len(scm.Trace.Events) == 0 { 1772 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1773 } 1774 var ready, connecting, transient, shutdown int 1775 for _, e := range scm.Trace.Events { 1776 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { 1777 transient++ 1778 } 1779 } 1780 // Make sure the SubChannel has already seen transient failure before shutting it down through 1781 // r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}). 1782 if transient == 0 { 1783 return false, fmt.Errorf("transient failure has not happened on SubChannel yet") 1784 } 1785 transient = 0 1786 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1787 for _, e := range scm.Trace.Events { 1788 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) { 1789 ready++ 1790 } 1791 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) { 1792 connecting++ 1793 } 1794 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { 1795 transient++ 1796 } 1797 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) { 1798 shutdown++ 1799 } 1800 } 1801 // example: 1802 // Subchannel Created 1803 // Subchannel's connectivity state changed to CONNECTING 1804 // Subchannel picked a new address: "localhost:36011" 1805 // Subchannel's connectivity state changed to READY 1806 // Subchannel's connectivity state changed to TRANSIENT_FAILURE 1807 // Subchannel's connectivity state changed to CONNECTING 1808 // Subchannel picked a new address: "localhost:36011" 1809 // Subchannel's connectivity state changed to SHUTDOWN 1810 // Subchannel Deleted 1811 if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 { 1812 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown) 1813 } 1814 1815 return true, nil 1816 }); err != nil { 1817 t.Fatal(err) 1818 } 1819} 1820 1821func (s) TestCZChannelConnectivityState(t *testing.T) { 1822 czCleanup := channelz.NewChannelzStorage() 1823 defer czCleanupWrapper(czCleanup, t) 1824 e := tcpClearRREnv 1825 te := newTest(t, e) 1826 te.startServer(&testServer{security: e.security}) 1827 r := manual.NewBuilderWithScheme("whatever") 1828 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1829 te.resolverScheme = r.Scheme() 1830 cc := te.clientConn(grpc.WithResolvers(r)) 1831 defer te.tearDown() 1832 tc := testpb.NewTestServiceClient(cc) 1833 // make sure the connection is up 1834 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1835 defer cancel() 1836 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1837 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1838 } 1839 te.srv.Stop() 1840 if err := verifyResultWithDelay(func() (bool, error) { 1841 tcs, _ := channelz.GetTopChannels(0, 0) 1842 if len(tcs) != 1 { 1843 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1844 } 1845 1846 var ready, connecting, transient int 1847 for _, e := range tcs[0].Trace.Events { 1848 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) { 1849 ready++ 1850 } 1851 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) { 1852 connecting++ 1853 } 1854 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) { 1855 transient++ 1856 } 1857 } 1858 1859 // example: 1860 // Channel Created 1861 // Adressses resolved (from empty address state): "localhost:40467" 1862 // SubChannel (id: 4[]) Created 1863 // Channel's connectivity state changed to CONNECTING 1864 // Channel's connectivity state changed to READY 1865 // Channel's connectivity state changed to TRANSIENT_FAILURE 1866 // Channel's connectivity state changed to CONNECTING 1867 // Channel's connectivity state changed to TRANSIENT_FAILURE 1868 if ready != 1 || connecting < 1 || transient < 1 { 1869 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient) 1870 } 1871 return true, nil 1872 }); err != nil { 1873 t.Fatal(err) 1874 } 1875} 1876 1877func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { 1878 czCleanup := channelz.NewChannelzStorage() 1879 defer czCleanupWrapper(czCleanup, t) 1880 e := tcpClearRREnv 1881 // avoid newTest using WithBalancerName, which would override service 1882 // config's change of balancer below. 1883 e.balancer = "" 1884 te := newTest(t, e) 1885 channelz.SetMaxTraceEntry(1) 1886 defer channelz.ResetMaxTraceEntryToDefault() 1887 r := manual.NewBuilderWithScheme("whatever") 1888 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 1889 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 1890 te.resolverScheme = r.Scheme() 1891 te.clientConn(grpc.WithResolvers(r)) 1892 defer te.tearDown() 1893 var nestedConn int64 1894 if err := verifyResultWithDelay(func() (bool, error) { 1895 tcs, _ := channelz.GetTopChannels(0, 0) 1896 if len(tcs) != 1 { 1897 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1898 } 1899 if len(tcs[0].NestedChans) != 1 { 1900 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1901 } 1902 for k := range tcs[0].NestedChans { 1903 nestedConn = k 1904 } 1905 return true, nil 1906 }); err != nil { 1907 t.Fatal(err) 1908 } 1909 1910 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1911 1912 // wait for the shutdown of grpclb balancer 1913 if err := verifyResultWithDelay(func() (bool, error) { 1914 tcs, _ := channelz.GetTopChannels(0, 0) 1915 if len(tcs) != 1 { 1916 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1917 } 1918 if len(tcs[0].NestedChans) != 0 { 1919 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1920 } 1921 return true, nil 1922 }); err != nil { 1923 t.Fatal(err) 1924 } 1925 1926 // If nested channel deletion is last trace event before the next validation, it will fail, as the top channel will hold a reference to it. 1927 // This line forces a trace event on the top channel in that case. 1928 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1929 1930 // verify that the nested channel no longer exist due to trace referencing it got overwritten. 1931 if err := verifyResultWithDelay(func() (bool, error) { 1932 cm := channelz.GetChannel(nestedConn) 1933 if cm != nil { 1934 return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore") 1935 } 1936 return true, nil 1937 }); err != nil { 1938 t.Fatal(err) 1939 } 1940} 1941 1942func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { 1943 czCleanup := channelz.NewChannelzStorage() 1944 defer czCleanupWrapper(czCleanup, t) 1945 e := tcpClearRREnv 1946 te := newTest(t, e) 1947 channelz.SetMaxTraceEntry(1) 1948 defer channelz.ResetMaxTraceEntryToDefault() 1949 te.startServer(&testServer{security: e.security}) 1950 r := manual.NewBuilderWithScheme("whatever") 1951 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1952 te.resolverScheme = r.Scheme() 1953 te.clientConn(grpc.WithResolvers(r)) 1954 defer te.tearDown() 1955 var subConn int64 1956 // Here, we just wait for all sockets to be up. In the future, if we implement 1957 // IDLE, we may need to make several rpc calls to create the sockets. 1958 if err := verifyResultWithDelay(func() (bool, error) { 1959 tcs, _ := channelz.GetTopChannels(0, 0) 1960 if len(tcs) != 1 { 1961 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1962 } 1963 if len(tcs[0].SubChans) != 1 { 1964 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1965 } 1966 for k := range tcs[0].SubChans { 1967 subConn = k 1968 } 1969 return true, nil 1970 }); err != nil { 1971 t.Fatal(err) 1972 } 1973 1974 // Wait for ready 1975 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1976 defer cancel() 1977 for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { 1978 if !te.cc.WaitForStateChange(ctx, src) { 1979 t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) 1980 } 1981 } 1982 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1983 // Wait for not-ready. 1984 for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { 1985 if !te.cc.WaitForStateChange(ctx, src) { 1986 t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) 1987 } 1988 } 1989 1990 // verify that the subchannel no longer exist due to trace referencing it got overwritten. 1991 if err := verifyResultWithDelay(func() (bool, error) { 1992 cm := channelz.GetChannel(subConn) 1993 if cm != nil { 1994 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") 1995 } 1996 return true, nil 1997 }); err != nil { 1998 t.Fatal(err) 1999 } 2000} 2001 2002func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { 2003 czCleanup := channelz.NewChannelzStorage() 2004 defer czCleanupWrapper(czCleanup, t) 2005 e := tcpClearRREnv 2006 te := newTest(t, e) 2007 te.startServer(&testServer{security: e.security}) 2008 r := manual.NewBuilderWithScheme("whatever") 2009 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 2010 te.resolverScheme = r.Scheme() 2011 te.clientConn(grpc.WithResolvers(r)) 2012 var subConn int64 2013 // Here, we just wait for all sockets to be up. In the future, if we implement 2014 // IDLE, we may need to make several rpc calls to create the sockets. 2015 if err := verifyResultWithDelay(func() (bool, error) { 2016 tcs, _ := channelz.GetTopChannels(0, 0) 2017 if len(tcs) != 1 { 2018 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 2019 } 2020 if len(tcs[0].SubChans) != 1 { 2021 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 2022 } 2023 for k := range tcs[0].SubChans { 2024 subConn = k 2025 } 2026 return true, nil 2027 }); err != nil { 2028 t.Fatal(err) 2029 } 2030 te.tearDown() 2031 // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared. 2032 if err := verifyResultWithDelay(func() (bool, error) { 2033 cm := channelz.GetChannel(subConn) 2034 if cm != nil { 2035 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") 2036 } 2037 return true, nil 2038 }); err != nil { 2039 t.Fatal(err) 2040 } 2041} 2042