1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package test 20 21import ( 22 "context" 23 "crypto/tls" 24 "fmt" 25 "net" 26 "reflect" 27 "strings" 28 "sync" 29 "testing" 30 "time" 31 32 "golang.org/x/net/http2" 33 "google.golang.org/grpc" 34 _ "google.golang.org/grpc/balancer/grpclb" 35 "google.golang.org/grpc/balancer/roundrobin" 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/connectivity" 38 "google.golang.org/grpc/credentials" 39 "google.golang.org/grpc/internal" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/keepalive" 42 "google.golang.org/grpc/resolver" 43 "google.golang.org/grpc/resolver/manual" 44 "google.golang.org/grpc/status" 45 testpb "google.golang.org/grpc/test/grpc_testing" 46 "google.golang.org/grpc/testdata" 47) 48 49func czCleanupWrapper(cleanup func() error, t *testing.T) { 50 if err := cleanup(); err != nil { 51 t.Error(err) 52 } 53} 54 55func verifyResultWithDelay(f func() (bool, error)) error { 56 var ok bool 57 var err error 58 for i := 0; i < 1000; i++ { 59 if ok, err = f(); ok { 60 return nil 61 } 62 time.Sleep(10 * time.Millisecond) 63 } 64 return err 65} 66 67func (s) TestCZServerRegistrationAndDeletion(t *testing.T) { 68 testcases := []struct { 69 total int 70 start int64 71 max int64 72 length int64 73 end bool 74 }{ 75 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 76 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 77 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 78 {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true}, 79 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 80 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 81 } 82 83 for _, c := range testcases { 84 czCleanup := channelz.NewChannelzStorage() 85 defer czCleanupWrapper(czCleanup, t) 86 e := tcpClearRREnv 87 te := newTest(t, e) 88 te.startServers(&testServer{security: e.security}, c.total) 89 90 ss, end := channelz.GetServers(c.start, c.max) 91 if int64(len(ss)) != c.length || end != c.end { 92 t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end) 93 } 94 te.tearDown() 95 ss, end = channelz.GetServers(c.start, c.max) 96 if len(ss) != 0 || !end { 97 t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end) 98 } 99 } 100} 101 102func (s) TestCZGetServer(t *testing.T) { 103 czCleanup := channelz.NewChannelzStorage() 104 defer czCleanupWrapper(czCleanup, t) 105 e := tcpClearRREnv 106 te := newTest(t, e) 107 te.startServer(&testServer{security: e.security}) 108 defer te.tearDown() 109 110 ss, _ := channelz.GetServers(0, 0) 111 if len(ss) != 1 { 112 t.Fatalf("there should only be one server, not %d", len(ss)) 113 } 114 115 serverID := ss[0].ID 116 srv := channelz.GetServer(serverID) 117 if srv == nil { 118 t.Fatalf("server %d does not exist", serverID) 119 } 120 if srv.ID != serverID { 121 t.Fatalf("server want id %d, but got %d", serverID, srv.ID) 122 } 123 124 te.tearDown() 125 126 if err := verifyResultWithDelay(func() (bool, error) { 127 srv := channelz.GetServer(serverID) 128 if srv != nil { 129 return false, fmt.Errorf("server %d should not exist", serverID) 130 } 131 132 return true, nil 133 }); err != nil { 134 t.Fatal(err) 135 } 136} 137 138func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) { 139 testcases := []struct { 140 total int 141 start int64 142 max int64 143 length int64 144 end bool 145 }{ 146 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 147 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 148 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 149 {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true}, 150 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 151 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 152 } 153 154 for _, c := range testcases { 155 czCleanup := channelz.NewChannelzStorage() 156 defer czCleanupWrapper(czCleanup, t) 157 e := tcpClearRREnv 158 te := newTest(t, e) 159 var ccs []*grpc.ClientConn 160 for i := 0; i < c.total; i++ { 161 cc := te.clientConn() 162 te.cc = nil 163 // avoid making next dial blocking 164 te.srvAddr = "" 165 ccs = append(ccs, cc) 166 } 167 if err := verifyResultWithDelay(func() (bool, error) { 168 if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end { 169 return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end) 170 } 171 return true, nil 172 }); err != nil { 173 t.Fatal(err) 174 } 175 176 for _, cc := range ccs { 177 cc.Close() 178 } 179 180 if err := verifyResultWithDelay(func() (bool, error) { 181 if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end { 182 return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end) 183 } 184 return true, nil 185 }); err != nil { 186 t.Fatal(err) 187 } 188 te.tearDown() 189 } 190} 191 192func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) { 193 czCleanup := channelz.NewChannelzStorage() 194 defer czCleanupWrapper(czCleanup, t) 195 // Make dial fails (due to no transport security specified) 196 _, err := grpc.Dial("fake.addr") 197 if err == nil { 198 t.Fatal("expecting dial to fail") 199 } 200 if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end { 201 t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end) 202 } 203} 204 205func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { 206 czCleanup := channelz.NewChannelzStorage() 207 defer czCleanupWrapper(czCleanup, t) 208 e := tcpClearRREnv 209 // avoid calling API to set balancer type, which will void service config's change of balancer. 210 e.balancer = "" 211 te := newTest(t, e) 212 r, cleanup := manual.GenerateAndRegisterManualResolver() 213 defer cleanup() 214 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 215 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 216 te.resolverScheme = r.Scheme() 217 te.clientConn() 218 defer te.tearDown() 219 220 if err := verifyResultWithDelay(func() (bool, error) { 221 tcs, _ := channelz.GetTopChannels(0, 0) 222 if len(tcs) != 1 { 223 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 224 } 225 if len(tcs[0].NestedChans) != 1 { 226 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 227 } 228 return true, nil 229 }); err != nil { 230 t.Fatal(err) 231 } 232 233 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 234 235 // wait for the shutdown of grpclb balancer 236 if err := verifyResultWithDelay(func() (bool, error) { 237 tcs, _ := channelz.GetTopChannels(0, 0) 238 if len(tcs) != 1 { 239 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 240 } 241 if len(tcs[0].NestedChans) != 0 { 242 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 243 } 244 return true, nil 245 }); err != nil { 246 t.Fatal(err) 247 } 248} 249 250func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { 251 czCleanup := channelz.NewChannelzStorage() 252 defer czCleanupWrapper(czCleanup, t) 253 e := tcpClearRREnv 254 num := 3 // number of backends 255 te := newTest(t, e) 256 var svrAddrs []resolver.Address 257 te.startServers(&testServer{security: e.security}, num) 258 r, cleanup := manual.GenerateAndRegisterManualResolver() 259 defer cleanup() 260 for _, a := range te.srvAddrs { 261 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 262 } 263 r.InitialState(resolver.State{Addresses: svrAddrs}) 264 te.resolverScheme = r.Scheme() 265 te.clientConn() 266 defer te.tearDown() 267 // Here, we just wait for all sockets to be up. In the future, if we implement 268 // IDLE, we may need to make several rpc calls to create the sockets. 269 if err := verifyResultWithDelay(func() (bool, error) { 270 tcs, _ := channelz.GetTopChannels(0, 0) 271 if len(tcs) != 1 { 272 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 273 } 274 if len(tcs[0].SubChans) != num { 275 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) 276 } 277 count := 0 278 for k := range tcs[0].SubChans { 279 sc := channelz.GetSubChannel(k) 280 if sc == nil { 281 return false, fmt.Errorf("got <nil> subchannel") 282 } 283 count += len(sc.Sockets) 284 } 285 if count != num { 286 return false, fmt.Errorf("there should be %d sockets not %d", num, count) 287 } 288 289 return true, nil 290 }); err != nil { 291 t.Fatal(err) 292 } 293 294 r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]}) 295 296 if err := verifyResultWithDelay(func() (bool, error) { 297 tcs, _ := channelz.GetTopChannels(0, 0) 298 if len(tcs) != 1 { 299 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 300 } 301 if len(tcs[0].SubChans) != num-1 { 302 return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans)) 303 } 304 count := 0 305 for k := range tcs[0].SubChans { 306 sc := channelz.GetSubChannel(k) 307 if sc == nil { 308 return false, fmt.Errorf("got <nil> subchannel") 309 } 310 count += len(sc.Sockets) 311 } 312 if count != num-1 { 313 return false, fmt.Errorf("there should be %d sockets not %d", num-1, count) 314 } 315 316 return true, nil 317 }); err != nil { 318 t.Fatal(err) 319 } 320} 321 322func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) { 323 testcases := []struct { 324 total int 325 start int64 326 max int64 327 length int64 328 end bool 329 }{ 330 {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true}, 331 {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true}, 332 {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false}, 333 {total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true}, 334 {total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true}, 335 {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false}, 336 {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false}, 337 } 338 339 for _, c := range testcases { 340 czCleanup := channelz.NewChannelzStorage() 341 defer czCleanupWrapper(czCleanup, t) 342 e := tcpClearRREnv 343 te := newTest(t, e) 344 te.startServer(&testServer{security: e.security}) 345 var ccs []*grpc.ClientConn 346 for i := 0; i < c.total; i++ { 347 cc := te.clientConn() 348 te.cc = nil 349 ccs = append(ccs, cc) 350 } 351 352 var svrID int64 353 if err := verifyResultWithDelay(func() (bool, error) { 354 ss, _ := channelz.GetServers(0, 0) 355 if len(ss) != 1 { 356 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 357 } 358 if len(ss[0].ListenSockets) != 1 { 359 return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) 360 } 361 362 startID := c.start 363 if startID != 0 { 364 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total)) 365 if int64(len(ns)) < c.start { 366 return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start) 367 } 368 startID = ns[c.start-1].ID + 1 369 } 370 371 ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max) 372 if int64(len(ns)) != c.length || end != c.end { 373 return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end) 374 } 375 376 svrID = ss[0].ID 377 return true, nil 378 }); err != nil { 379 t.Fatal(err) 380 } 381 382 for _, cc := range ccs { 383 cc.Close() 384 } 385 386 if err := verifyResultWithDelay(func() (bool, error) { 387 ns, _ := channelz.GetServerSockets(svrID, c.start, c.max) 388 if len(ns) != 0 { 389 return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns)) 390 } 391 return true, nil 392 }); err != nil { 393 t.Fatal(err) 394 } 395 te.tearDown() 396 } 397} 398 399func (s) TestCZServerListenSocketDeletion(t *testing.T) { 400 czCleanup := channelz.NewChannelzStorage() 401 defer czCleanupWrapper(czCleanup, t) 402 s := grpc.NewServer() 403 lis, err := net.Listen("tcp", "localhost:0") 404 if err != nil { 405 t.Fatalf("failed to listen: %v", err) 406 } 407 go s.Serve(lis) 408 if err := verifyResultWithDelay(func() (bool, error) { 409 ss, _ := channelz.GetServers(0, 0) 410 if len(ss) != 1 { 411 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 412 } 413 if len(ss[0].ListenSockets) != 1 { 414 return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) 415 } 416 return true, nil 417 }); err != nil { 418 t.Fatal(err) 419 } 420 421 lis.Close() 422 if err := verifyResultWithDelay(func() (bool, error) { 423 ss, _ := channelz.GetServers(0, 0) 424 if len(ss) != 1 { 425 return false, fmt.Errorf("there should be 1 server, not %d", len(ss)) 426 } 427 if len(ss[0].ListenSockets) != 0 { 428 return false, fmt.Errorf("there should only be %d server listen socket, not %d", 0, len(ss[0].ListenSockets)) 429 } 430 return true, nil 431 }); err != nil { 432 t.Fatal(err) 433 } 434 s.Stop() 435} 436 437type dummyChannel struct{} 438 439func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 440 return &channelz.ChannelInternalMetric{} 441} 442 443type dummySocket struct{} 444 445func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric { 446 return &channelz.SocketInternalMetric{} 447} 448 449func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { 450 // +--+TopChan+---+ 451 // | | 452 // v v 453 // +-+SubChan1+--+ SubChan2 454 // | | 455 // v v 456 // Socket1 Socket2 457 czCleanup := channelz.NewChannelzStorage() 458 defer czCleanupWrapper(czCleanup, t) 459 topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "") 460 subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") 461 subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") 462 sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") 463 sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") 464 465 tcs, _ := channelz.GetTopChannels(0, 0) 466 if tcs == nil || len(tcs) != 1 { 467 t.Fatalf("There should be one TopChannel entry") 468 } 469 if len(tcs[0].SubChans) != 2 { 470 t.Fatalf("There should be two SubChannel entries") 471 } 472 sc := channelz.GetSubChannel(subChanID1) 473 if sc == nil || len(sc.Sockets) != 2 { 474 t.Fatalf("There should be two Socket entries") 475 } 476 477 channelz.RemoveEntry(topChanID) 478 tcs, _ = channelz.GetTopChannels(0, 0) 479 if tcs == nil || len(tcs) != 1 { 480 t.Fatalf("There should be one TopChannel entry") 481 } 482 483 channelz.RemoveEntry(subChanID1) 484 channelz.RemoveEntry(subChanID2) 485 tcs, _ = channelz.GetTopChannels(0, 0) 486 if tcs == nil || len(tcs) != 1 { 487 t.Fatalf("There should be one TopChannel entry") 488 } 489 if len(tcs[0].SubChans) != 1 { 490 t.Fatalf("There should be one SubChannel entry") 491 } 492 493 channelz.RemoveEntry(sktID1) 494 channelz.RemoveEntry(sktID2) 495 tcs, _ = channelz.GetTopChannels(0, 0) 496 if tcs != nil { 497 t.Fatalf("There should be no TopChannel entry") 498 } 499} 500 501func (s) TestCZChannelMetrics(t *testing.T) { 502 czCleanup := channelz.NewChannelzStorage() 503 defer czCleanupWrapper(czCleanup, t) 504 e := tcpClearRREnv 505 num := 3 // number of backends 506 te := newTest(t, e) 507 te.maxClientSendMsgSize = newInt(8) 508 var svrAddrs []resolver.Address 509 te.startServers(&testServer{security: e.security}, num) 510 r, cleanup := manual.GenerateAndRegisterManualResolver() 511 defer cleanup() 512 for _, a := range te.srvAddrs { 513 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 514 } 515 r.InitialState(resolver.State{Addresses: svrAddrs}) 516 te.resolverScheme = r.Scheme() 517 cc := te.clientConn() 518 defer te.tearDown() 519 tc := testpb.NewTestServiceClient(cc) 520 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 521 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 522 } 523 524 const smallSize = 1 525 const largeSize = 8 526 527 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 528 if err != nil { 529 t.Fatal(err) 530 } 531 req := &testpb.SimpleRequest{ 532 ResponseType: testpb.PayloadType_COMPRESSABLE, 533 ResponseSize: int32(smallSize), 534 Payload: largePayload, 535 } 536 537 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 538 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 539 } 540 541 stream, err := tc.FullDuplexCall(context.Background()) 542 if err != nil { 543 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 544 } 545 defer stream.CloseSend() 546 // Here, we just wait for all sockets to be up. In the future, if we implement 547 // IDLE, we may need to make several rpc calls to create the sockets. 548 if err := verifyResultWithDelay(func() (bool, error) { 549 tcs, _ := channelz.GetTopChannels(0, 0) 550 if len(tcs) != 1 { 551 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 552 } 553 if len(tcs[0].SubChans) != num { 554 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) 555 } 556 var cst, csu, cf int64 557 for k := range tcs[0].SubChans { 558 sc := channelz.GetSubChannel(k) 559 if sc == nil { 560 return false, fmt.Errorf("got <nil> subchannel") 561 } 562 cst += sc.ChannelData.CallsStarted 563 csu += sc.ChannelData.CallsSucceeded 564 cf += sc.ChannelData.CallsFailed 565 } 566 if cst != 3 { 567 return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst) 568 } 569 if csu != 1 { 570 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu) 571 } 572 if cf != 1 { 573 return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf) 574 } 575 if tcs[0].ChannelData.CallsStarted != 3 { 576 return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted) 577 } 578 if tcs[0].ChannelData.CallsSucceeded != 1 { 579 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded) 580 } 581 if tcs[0].ChannelData.CallsFailed != 1 { 582 return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed) 583 } 584 return true, nil 585 }); err != nil { 586 t.Fatal(err) 587 } 588} 589 590func (s) TestCZServerMetrics(t *testing.T) { 591 czCleanup := channelz.NewChannelzStorage() 592 defer czCleanupWrapper(czCleanup, t) 593 e := tcpClearRREnv 594 te := newTest(t, e) 595 te.maxServerReceiveMsgSize = newInt(8) 596 te.startServer(&testServer{security: e.security}) 597 defer te.tearDown() 598 cc := te.clientConn() 599 tc := testpb.NewTestServiceClient(cc) 600 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 601 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 602 } 603 604 const smallSize = 1 605 const largeSize = 8 606 607 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 608 if err != nil { 609 t.Fatal(err) 610 } 611 req := &testpb.SimpleRequest{ 612 ResponseType: testpb.PayloadType_COMPRESSABLE, 613 ResponseSize: int32(smallSize), 614 Payload: largePayload, 615 } 616 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 617 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 618 } 619 620 stream, err := tc.FullDuplexCall(context.Background()) 621 if err != nil { 622 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 623 } 624 defer stream.CloseSend() 625 626 if err := verifyResultWithDelay(func() (bool, error) { 627 ss, _ := channelz.GetServers(0, 0) 628 if len(ss) != 1 { 629 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 630 } 631 if ss[0].ServerData.CallsStarted != 3 { 632 return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted) 633 } 634 if ss[0].ServerData.CallsSucceeded != 1 { 635 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded) 636 } 637 if ss[0].ServerData.CallsFailed != 1 { 638 return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed) 639 } 640 return true, nil 641 }); err != nil { 642 t.Fatal(err) 643 } 644} 645 646type testServiceClientWrapper struct { 647 testpb.TestServiceClient 648 mu sync.RWMutex 649 streamsCreated int 650} 651 652func (t *testServiceClientWrapper) getCurrentStreamID() uint32 { 653 t.mu.RLock() 654 defer t.mu.RUnlock() 655 return uint32(2*t.streamsCreated - 1) 656} 657 658func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) { 659 t.mu.Lock() 660 defer t.mu.Unlock() 661 t.streamsCreated++ 662 return t.TestServiceClient.EmptyCall(ctx, in, opts...) 663} 664 665func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) { 666 t.mu.Lock() 667 defer t.mu.Unlock() 668 t.streamsCreated++ 669 return t.TestServiceClient.UnaryCall(ctx, in, opts...) 670} 671 672func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) { 673 t.mu.Lock() 674 defer t.mu.Unlock() 675 t.streamsCreated++ 676 return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...) 677} 678 679func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) { 680 t.mu.Lock() 681 defer t.mu.Unlock() 682 t.streamsCreated++ 683 return t.TestServiceClient.StreamingInputCall(ctx, opts...) 684} 685 686func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) { 687 t.mu.Lock() 688 defer t.mu.Unlock() 689 t.streamsCreated++ 690 return t.TestServiceClient.FullDuplexCall(ctx, opts...) 691} 692 693func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) { 694 t.mu.Lock() 695 defer t.mu.Unlock() 696 t.streamsCreated++ 697 return t.TestServiceClient.HalfDuplexCall(ctx, opts...) 698} 699 700func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) { 701 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 702 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 703 } 704} 705 706func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) { 707 s, err := tc.StreamingInputCall(context.Background()) 708 if err != nil { 709 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err) 710 } 711 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000) 712 if err != nil { 713 t.Fatal(err) 714 } 715 s.Send(&testpb.StreamingInputCallRequest{Payload: payload}) 716} 717 718func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) { 719 const smallSize = 1 720 const largeSize = 2000 721 722 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 723 if err != nil { 724 t.Fatal(err) 725 } 726 req := &testpb.SimpleRequest{ 727 ResponseType: testpb.PayloadType_COMPRESSABLE, 728 ResponseSize: int32(smallSize), 729 Payload: largePayload, 730 } 731 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 732 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 733 } 734} 735 736func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) { 737 ctx, cancel := context.WithCancel(context.Background()) 738 stream, err := tc.FullDuplexCall(ctx) 739 if err != nil { 740 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 741 } 742 743 const smallSize = 1 744 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 745 if err != nil { 746 t.Fatal(err) 747 } 748 749 sreq := &testpb.StreamingOutputCallRequest{ 750 ResponseType: testpb.PayloadType_COMPRESSABLE, 751 ResponseParameters: []*testpb.ResponseParameters{ 752 {Size: smallSize}, 753 }, 754 Payload: smallPayload, 755 } 756 757 if err := stream.Send(sreq); err != nil { 758 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 759 } 760 if _, err := stream.Recv(); err != nil { 761 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 762 } 763 // By canceling the call, the client will send rst_stream to end the call, and 764 // the stream will failed as a result. 765 cancel() 766} 767 768// This func is to be used to test client side counting of failed streams. 769func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) { 770 stream, err := tc.FullDuplexCall(context.Background()) 771 if err != nil { 772 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 773 } 774 775 const smallSize = 1 776 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 777 if err != nil { 778 t.Fatal(err) 779 } 780 781 sreq := &testpb.StreamingOutputCallRequest{ 782 ResponseType: testpb.PayloadType_COMPRESSABLE, 783 ResponseParameters: []*testpb.ResponseParameters{ 784 {Size: smallSize}, 785 }, 786 Payload: smallPayload, 787 } 788 789 if err := stream.Send(sreq); err != nil { 790 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 791 } 792 if _, err := stream.Recv(); err != nil { 793 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 794 } 795 796 rcw := l.getLastConn() 797 798 if rcw != nil { 799 rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel) 800 } 801 if _, err := stream.Recv(); err == nil { 802 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) 803 } 804} 805 806// this func is to be used to test client side counting of failed streams. 807func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) { 808 // This call is just to keep the transport from shutting down (socket will be deleted 809 // in this case, and we will not be able to get metrics). 810 s, err := tc.FullDuplexCall(context.Background()) 811 if err != nil { 812 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 813 } 814 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{ 815 { 816 Size: 1, 817 }, 818 }}); err != nil { 819 t.Fatalf("s.Send() failed with error: %v", err) 820 } 821 if _, err := s.Recv(); err != nil { 822 t.Fatalf("s.Recv() failed with error: %v", err) 823 } 824 825 s, err = tc.FullDuplexCall(context.Background()) 826 if err != nil { 827 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 828 } 829 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{ 830 { 831 Size: 1, 832 }, 833 }}); err != nil { 834 t.Fatalf("s.Send() failed with error: %v", err) 835 } 836 if _, err := s.Recv(); err != nil { 837 t.Fatalf("s.Recv() failed with error: %v", err) 838 } 839 840 rcw := l.getLastConn() 841 if rcw != nil { 842 rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{}) 843 } 844 if _, err := s.Recv(); err == nil { 845 t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err) 846 } 847} 848 849func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) { 850 ctx, cancel := context.WithCancel(context.Background()) 851 _, err := tc.FullDuplexCall(ctx) 852 if err != nil { 853 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 854 } 855 // Allow for at least 2 keepalives (1s per ping interval) 856 time.Sleep(4 * time.Second) 857 cancel() 858} 859 860func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { 861 czCleanup := channelz.NewChannelzStorage() 862 defer czCleanupWrapper(czCleanup, t) 863 e := tcpClearRREnv 864 te := newTest(t, e) 865 te.maxServerReceiveMsgSize = newInt(20) 866 te.maxClientReceiveMsgSize = newInt(20) 867 rcw := te.startServerWithConnControl(&testServer{security: e.security}) 868 defer te.tearDown() 869 cc := te.clientConn() 870 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 871 872 doSuccessfulUnaryCall(tc, t) 873 var scID, skID int64 874 if err := verifyResultWithDelay(func() (bool, error) { 875 tchan, _ := channelz.GetTopChannels(0, 0) 876 if len(tchan) != 1 { 877 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 878 } 879 if len(tchan[0].SubChans) != 1 { 880 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 881 } 882 883 for scID = range tchan[0].SubChans { 884 break 885 } 886 sc := channelz.GetSubChannel(scID) 887 if sc == nil { 888 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID) 889 } 890 if len(sc.Sockets) != 1 { 891 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 892 } 893 for skID = range sc.Sockets { 894 break 895 } 896 skt := channelz.GetSocket(skID) 897 sktData := skt.SocketData 898 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 899 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived) 900 } 901 return true, nil 902 }); err != nil { 903 t.Fatal(err) 904 } 905 906 doServerSideFailedUnaryCall(tc, t) 907 if err := verifyResultWithDelay(func() (bool, error) { 908 skt := channelz.GetSocket(skID) 909 sktData := skt.SocketData 910 if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 { 911 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived) 912 } 913 return true, nil 914 }); err != nil { 915 t.Fatal(err) 916 } 917 918 doClientSideInitiatedFailedStream(tc, t) 919 if err := verifyResultWithDelay(func() (bool, error) { 920 skt := channelz.GetSocket(skID) 921 sktData := skt.SocketData 922 if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 { 923 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 924 } 925 return true, nil 926 }); err != nil { 927 t.Fatal(err) 928 } 929 930 doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw) 931 if err := verifyResultWithDelay(func() (bool, error) { 932 skt := channelz.GetSocket(skID) 933 sktData := skt.SocketData 934 if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 { 935 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 936 } 937 return true, nil 938 }); err != nil { 939 t.Fatal(err) 940 } 941 942 doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw) 943 if err := verifyResultWithDelay(func() (bool, error) { 944 skt := channelz.GetSocket(skID) 945 sktData := skt.SocketData 946 if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 { 947 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 948 } 949 return true, nil 950 }); err != nil { 951 t.Fatal(err) 952 } 953} 954 955// This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and 956// TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of 957// server sending RST_STREAM to client due to client side flow control violation. 958// It is separated from other cases due to setup incompatibly, i.e. max receive 959// size violation will mask flow control violation. 960func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) { 961 czCleanup := channelz.NewChannelzStorage() 962 defer czCleanupWrapper(czCleanup, t) 963 e := tcpClearRREnv 964 te := newTest(t, e) 965 te.serverInitialWindowSize = 65536 966 // Avoid overflowing connection level flow control window, which will lead to 967 // transport being closed. 968 te.serverInitialConnWindowSize = 65536 * 2 969 ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 970 stream.Send(&testpb.StreamingOutputCallResponse{}) 971 <-stream.Context().Done() 972 return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled") 973 }} 974 te.startServer(ts) 975 defer te.tearDown() 976 cc, dw := te.clientConnWithConnControl() 977 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 978 979 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 980 stream, err := tc.FullDuplexCall(ctx) 981 if err != nil { 982 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 983 } 984 if _, err := stream.Recv(); err != nil { 985 t.Fatalf("stream.Recv() = %v, want nil", err) 986 } 987 go func() { 988 payload := make([]byte, 16384) 989 for i := 0; i < 6; i++ { 990 dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload) 991 } 992 }() 993 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { 994 t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted) 995 } 996 cancel() 997 998 if err := verifyResultWithDelay(func() (bool, error) { 999 tchan, _ := channelz.GetTopChannels(0, 0) 1000 if len(tchan) != 1 { 1001 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1002 } 1003 if len(tchan[0].SubChans) != 1 { 1004 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1005 } 1006 var id int64 1007 for id = range tchan[0].SubChans { 1008 break 1009 } 1010 sc := channelz.GetSubChannel(id) 1011 if sc == nil { 1012 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1013 } 1014 if len(sc.Sockets) != 1 { 1015 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1016 } 1017 for id = range sc.Sockets { 1018 break 1019 } 1020 skt := channelz.GetSocket(id) 1021 sktData := skt.SocketData 1022 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 { 1023 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) 1024 } 1025 ss, _ := channelz.GetServers(0, 0) 1026 if len(ss) != 1 { 1027 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1028 } 1029 1030 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1031 if len(ns) != 1 { 1032 return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) 1033 } 1034 sktData = ns[0].SocketData 1035 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 { 1036 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) 1037 } 1038 return true, nil 1039 }); err != nil { 1040 t.Fatal(err) 1041 } 1042} 1043 1044func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { 1045 czCleanup := channelz.NewChannelzStorage() 1046 defer czCleanupWrapper(czCleanup, t) 1047 e := tcpClearRREnv 1048 te := newTest(t, e) 1049 // disable BDP 1050 te.serverInitialWindowSize = 65536 1051 te.serverInitialConnWindowSize = 65536 1052 te.clientInitialWindowSize = 65536 1053 te.clientInitialConnWindowSize = 65536 1054 te.startServer(&testServer{security: e.security}) 1055 defer te.tearDown() 1056 cc := te.clientConn() 1057 tc := testpb.NewTestServiceClient(cc) 1058 1059 for i := 0; i < 10; i++ { 1060 doSuccessfulUnaryCall(tc, t) 1061 } 1062 1063 var cliSktID, svrSktID int64 1064 if err := verifyResultWithDelay(func() (bool, error) { 1065 tchan, _ := channelz.GetTopChannels(0, 0) 1066 if len(tchan) != 1 { 1067 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1068 } 1069 if len(tchan[0].SubChans) != 1 { 1070 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1071 } 1072 var id int64 1073 for id = range tchan[0].SubChans { 1074 break 1075 } 1076 sc := channelz.GetSubChannel(id) 1077 if sc == nil { 1078 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1079 } 1080 if len(sc.Sockets) != 1 { 1081 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1082 } 1083 for id = range sc.Sockets { 1084 break 1085 } 1086 skt := channelz.GetSocket(id) 1087 sktData := skt.SocketData 1088 // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1089 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { 1090 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1091 } 1092 ss, _ := channelz.GetServers(0, 0) 1093 if len(ss) != 1 { 1094 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1095 } 1096 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1097 sktData = ns[0].SocketData 1098 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { 1099 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1100 } 1101 cliSktID, svrSktID = id, ss[0].ID 1102 return true, nil 1103 }); err != nil { 1104 t.Fatal(err) 1105 } 1106 1107 doStreamingInputCallWithLargePayload(tc, t) 1108 1109 if err := verifyResultWithDelay(func() (bool, error) { 1110 skt := channelz.GetSocket(cliSktID) 1111 sktData := skt.SocketData 1112 // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1113 // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475 1114 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 { 1115 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1116 } 1117 ss, _ := channelz.GetServers(0, 0) 1118 if len(ss) != 1 { 1119 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1120 } 1121 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0) 1122 sktData = ns[0].SocketData 1123 if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 { 1124 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1125 } 1126 return true, nil 1127 }); err != nil { 1128 t.Fatal(err) 1129 } 1130 1131 // triggers transport flow control window update on server side, since unacked 1132 // bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4. 1133 doStreamingInputCallWithLargePayload(tc, t) 1134 if err := verifyResultWithDelay(func() (bool, error) { 1135 skt := channelz.GetSocket(cliSktID) 1136 sktData := skt.SocketData 1137 // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 1138 // Remote: 65536 1139 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 { 1140 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1141 } 1142 ss, _ := channelz.GetServers(0, 0) 1143 if len(ss) != 1 { 1144 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1145 } 1146 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0) 1147 sktData = ns[0].SocketData 1148 if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 { 1149 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) 1150 } 1151 return true, nil 1152 }); err != nil { 1153 t.Fatal(err) 1154 } 1155} 1156 1157func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { 1158 czCleanup := channelz.NewChannelzStorage() 1159 defer czCleanupWrapper(czCleanup, t) 1160 defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime) 1161 internal.KeepaliveMinPingTime = time.Second 1162 e := tcpClearRREnv 1163 te := newTest(t, e) 1164 te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams( 1165 keepalive.ClientParameters{ 1166 Time: time.Second, 1167 Timeout: 500 * time.Millisecond, 1168 PermitWithoutStream: true, 1169 })) 1170 te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy( 1171 keepalive.EnforcementPolicy{ 1172 MinTime: 500 * time.Millisecond, 1173 PermitWithoutStream: true, 1174 })) 1175 te.startServer(&testServer{security: e.security}) 1176 te.clientConn() // Dial the server 1177 defer te.tearDown() 1178 if err := verifyResultWithDelay(func() (bool, error) { 1179 tchan, _ := channelz.GetTopChannels(0, 0) 1180 if len(tchan) != 1 { 1181 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1182 } 1183 if len(tchan[0].SubChans) != 1 { 1184 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1185 } 1186 var id int64 1187 for id = range tchan[0].SubChans { 1188 break 1189 } 1190 sc := channelz.GetSubChannel(id) 1191 if sc == nil { 1192 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1193 } 1194 if len(sc.Sockets) != 1 { 1195 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1196 } 1197 for id = range sc.Sockets { 1198 break 1199 } 1200 skt := channelz.GetSocket(id) 1201 if skt.SocketData.KeepAlivesSent != 2 { 1202 return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) 1203 } 1204 return true, nil 1205 }); err != nil { 1206 t.Fatal(err) 1207 } 1208} 1209 1210func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { 1211 czCleanup := channelz.NewChannelzStorage() 1212 defer czCleanupWrapper(czCleanup, t) 1213 e := tcpClearRREnv 1214 te := newTest(t, e) 1215 te.maxServerReceiveMsgSize = newInt(20) 1216 te.maxClientReceiveMsgSize = newInt(20) 1217 te.startServer(&testServer{security: e.security}) 1218 defer te.tearDown() 1219 cc, _ := te.clientConnWithConnControl() 1220 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 1221 1222 var svrID int64 1223 if err := verifyResultWithDelay(func() (bool, error) { 1224 ss, _ := channelz.GetServers(0, 0) 1225 if len(ss) != 1 { 1226 return false, fmt.Errorf("there should only be one server, not %d", len(ss)) 1227 } 1228 svrID = ss[0].ID 1229 return true, nil 1230 }); err != nil { 1231 t.Fatal(err) 1232 } 1233 1234 doSuccessfulUnaryCall(tc, t) 1235 if err := verifyResultWithDelay(func() (bool, error) { 1236 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1237 sktData := ns[0].SocketData 1238 if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 1239 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1240 } 1241 return true, nil 1242 }); err != nil { 1243 t.Fatal(err) 1244 } 1245 1246 doServerSideFailedUnaryCall(tc, t) 1247 if err := verifyResultWithDelay(func() (bool, error) { 1248 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1249 sktData := ns[0].SocketData 1250 if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { 1251 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1252 } 1253 return true, nil 1254 }); err != nil { 1255 t.Fatal(err) 1256 } 1257 1258 doClientSideInitiatedFailedStream(tc, t) 1259 if err := verifyResultWithDelay(func() (bool, error) { 1260 ns, _ := channelz.GetServerSockets(svrID, 0, 0) 1261 sktData := ns[0].SocketData 1262 if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 { 1263 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) 1264 } 1265 return true, nil 1266 }); err != nil { 1267 t.Fatal(err) 1268 } 1269} 1270 1271func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) { 1272 czCleanup := channelz.NewChannelzStorage() 1273 defer czCleanupWrapper(czCleanup, t) 1274 e := tcpClearRREnv 1275 te := newTest(t, e) 1276 // We setup the server keepalive parameters to send one keepalive every 1277 // second, and verify that the actual number of keepalives is very close to 1278 // the number of seconds elapsed in the test. We had a bug wherein the 1279 // server was sending one keepalive every [Time+Timeout] instead of every 1280 // [Time] period, and since Timeout is configured to a low value here, we 1281 // should be able to verify that the fix works with the above mentioned 1282 // logic. 1283 kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{ 1284 Time: time.Second, 1285 Timeout: 100 * time.Millisecond, 1286 }) 1287 te.customServerOptions = append(te.customServerOptions, kpOption) 1288 te.startServer(&testServer{security: e.security}) 1289 defer te.tearDown() 1290 cc := te.clientConn() 1291 tc := testpb.NewTestServiceClient(cc) 1292 start := time.Now() 1293 doIdleCallToInvokeKeepAlive(tc, t) 1294 1295 if err := verifyResultWithDelay(func() (bool, error) { 1296 ss, _ := channelz.GetServers(0, 0) 1297 if len(ss) != 1 { 1298 return false, fmt.Errorf("there should be one server, not %d", len(ss)) 1299 } 1300 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0) 1301 if len(ns) != 1 { 1302 return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) 1303 } 1304 wantKeepalivesCount := int64(time.Since(start).Seconds()) - 1 1305 if gotKeepalivesCount := ns[0].SocketData.KeepAlivesSent; gotKeepalivesCount != wantKeepalivesCount { 1306 return false, fmt.Errorf("got keepalivesCount: %v, want keepalivesCount: %v", gotKeepalivesCount, wantKeepalivesCount) 1307 } 1308 return true, nil 1309 }); err != nil { 1310 t.Fatal(err) 1311 } 1312} 1313 1314var cipherSuites = []string{ 1315 "TLS_RSA_WITH_RC4_128_SHA", 1316 "TLS_RSA_WITH_3DES_EDE_CBC_SHA", 1317 "TLS_RSA_WITH_AES_128_CBC_SHA", 1318 "TLS_RSA_WITH_AES_256_CBC_SHA", 1319 "TLS_RSA_WITH_AES_128_GCM_SHA256", 1320 "TLS_RSA_WITH_AES_256_GCM_SHA384", 1321 "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA", 1322 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", 1323 "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA", 1324 "TLS_ECDHE_RSA_WITH_RC4_128_SHA", 1325 "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA", 1326 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", 1327 "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA", 1328 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", 1329 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", 1330 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", 1331 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", 1332 "TLS_FALLBACK_SCSV", 1333 "TLS_RSA_WITH_AES_128_CBC_SHA256", 1334 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", 1335 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", 1336 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", 1337 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", 1338 "TLS_AES_128_GCM_SHA256", 1339 "TLS_AES_256_GCM_SHA384", 1340 "TLS_CHACHA20_POLY1305_SHA256", 1341} 1342 1343func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { 1344 czCleanup := channelz.NewChannelzStorage() 1345 defer czCleanupWrapper(czCleanup, t) 1346 e := tcpTLSRREnv 1347 te := newTest(t, e) 1348 te.startServer(&testServer{security: e.security}) 1349 defer te.tearDown() 1350 te.clientConn() 1351 if err := verifyResultWithDelay(func() (bool, error) { 1352 tchan, _ := channelz.GetTopChannels(0, 0) 1353 if len(tchan) != 1 { 1354 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) 1355 } 1356 if len(tchan[0].SubChans) != 1 { 1357 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) 1358 } 1359 var id int64 1360 for id = range tchan[0].SubChans { 1361 break 1362 } 1363 sc := channelz.GetSubChannel(id) 1364 if sc == nil { 1365 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) 1366 } 1367 if len(sc.Sockets) != 1 { 1368 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) 1369 } 1370 for id = range sc.Sockets { 1371 break 1372 } 1373 skt := channelz.GetSocket(id) 1374 cert, _ := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key")) 1375 securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue) 1376 if !ok { 1377 return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security) 1378 } 1379 if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) { 1380 return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0]) 1381 } 1382 for _, v := range cipherSuites { 1383 if v == securityVal.StandardName { 1384 return true, nil 1385 } 1386 } 1387 return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites) 1388 }); err != nil { 1389 t.Fatal(err) 1390 } 1391} 1392 1393func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { 1394 czCleanup := channelz.NewChannelzStorage() 1395 defer czCleanupWrapper(czCleanup, t) 1396 e := tcpClearRREnv 1397 // avoid calling API to set balancer type, which will void service config's change of balancer. 1398 e.balancer = "" 1399 te := newTest(t, e) 1400 r, cleanup := manual.GenerateAndRegisterManualResolver() 1401 defer cleanup() 1402 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 1403 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 1404 te.resolverScheme = r.Scheme() 1405 te.clientConn() 1406 defer te.tearDown() 1407 var nestedConn int64 1408 if err := verifyResultWithDelay(func() (bool, error) { 1409 tcs, _ := channelz.GetTopChannels(0, 0) 1410 if len(tcs) != 1 { 1411 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1412 } 1413 if len(tcs[0].NestedChans) != 1 { 1414 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1415 } 1416 for k := range tcs[0].NestedChans { 1417 nestedConn = k 1418 } 1419 for _, e := range tcs[0].Trace.Events { 1420 if e.RefID == nestedConn && e.RefType != channelz.RefChannel { 1421 return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType") 1422 } 1423 } 1424 ncm := channelz.GetChannel(nestedConn) 1425 if ncm.Trace == nil { 1426 return false, fmt.Errorf("trace for nested channel should not be empty") 1427 } 1428 if len(ncm.Trace.Events) == 0 { 1429 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") 1430 } 1431 if ncm.Trace.Events[0].Desc != "Channel Created" { 1432 return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc) 1433 } 1434 return true, nil 1435 }); err != nil { 1436 t.Fatal(err) 1437 } 1438 1439 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1440 1441 // wait for the shutdown of grpclb balancer 1442 if err := verifyResultWithDelay(func() (bool, error) { 1443 tcs, _ := channelz.GetTopChannels(0, 0) 1444 if len(tcs) != 1 { 1445 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1446 } 1447 if len(tcs[0].NestedChans) != 0 { 1448 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1449 } 1450 ncm := channelz.GetChannel(nestedConn) 1451 if ncm == nil { 1452 return false, fmt.Errorf("nested channel should still exist due to parent's trace reference") 1453 } 1454 if ncm.Trace == nil { 1455 return false, fmt.Errorf("trace for nested channel should not be empty") 1456 } 1457 if len(ncm.Trace.Events) == 0 { 1458 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") 1459 } 1460 if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" { 1461 return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc) 1462 } 1463 return true, nil 1464 }); err != nil { 1465 t.Fatal(err) 1466 } 1467} 1468 1469func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { 1470 czCleanup := channelz.NewChannelzStorage() 1471 defer czCleanupWrapper(czCleanup, t) 1472 e := tcpClearRREnv 1473 te := newTest(t, e) 1474 te.startServer(&testServer{security: e.security}) 1475 r, cleanup := manual.GenerateAndRegisterManualResolver() 1476 defer cleanup() 1477 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1478 te.resolverScheme = r.Scheme() 1479 te.clientConn() 1480 defer te.tearDown() 1481 var subConn int64 1482 // Here, we just wait for all sockets to be up. In the future, if we implement 1483 // IDLE, we may need to make several rpc calls to create the sockets. 1484 if err := verifyResultWithDelay(func() (bool, error) { 1485 tcs, _ := channelz.GetTopChannels(0, 0) 1486 if len(tcs) != 1 { 1487 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1488 } 1489 if len(tcs[0].SubChans) != 1 { 1490 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1491 } 1492 for k := range tcs[0].SubChans { 1493 subConn = k 1494 } 1495 for _, e := range tcs[0].Trace.Events { 1496 if e.RefID == subConn && e.RefType != channelz.RefSubChannel { 1497 return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel") 1498 } 1499 } 1500 scm := channelz.GetSubChannel(subConn) 1501 if scm == nil { 1502 return false, fmt.Errorf("subChannel does not exist") 1503 } 1504 if scm.Trace == nil { 1505 return false, fmt.Errorf("trace for subChannel should not be empty") 1506 } 1507 if len(scm.Trace.Events) == 0 { 1508 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1509 } 1510 if scm.Trace.Events[0].Desc != "Subchannel Created" { 1511 return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc) 1512 } 1513 return true, nil 1514 }); err != nil { 1515 t.Fatal(err) 1516 } 1517 1518 // Wait for ready 1519 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1520 defer cancel() 1521 for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { 1522 if !te.cc.WaitForStateChange(ctx, src) { 1523 t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) 1524 } 1525 } 1526 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1527 // Wait for not-ready. 1528 for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { 1529 if !te.cc.WaitForStateChange(ctx, src) { 1530 t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) 1531 } 1532 } 1533 1534 if err := verifyResultWithDelay(func() (bool, error) { 1535 tcs, _ := channelz.GetTopChannels(0, 0) 1536 if len(tcs) != 1 { 1537 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1538 } 1539 if len(tcs[0].SubChans) != 1 { 1540 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1541 } 1542 scm := channelz.GetSubChannel(subConn) 1543 if scm == nil { 1544 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") 1545 } 1546 if scm.Trace == nil { 1547 return false, fmt.Errorf("trace for SubChannel should not be empty") 1548 } 1549 if len(scm.Trace.Events) == 0 { 1550 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1551 } 1552 if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want { 1553 return false, fmt.Errorf("the last trace event should be %q, not %q", want, got) 1554 } 1555 1556 return true, nil 1557 }); err != nil { 1558 t.Fatal(err) 1559 } 1560} 1561 1562func (s) TestCZChannelAddressResolutionChange(t *testing.T) { 1563 czCleanup := channelz.NewChannelzStorage() 1564 defer czCleanupWrapper(czCleanup, t) 1565 e := tcpClearRREnv 1566 e.balancer = "" 1567 te := newTest(t, e) 1568 te.startServer(&testServer{security: e.security}) 1569 r, cleanup := manual.GenerateAndRegisterManualResolver() 1570 defer cleanup() 1571 addrs := []resolver.Address{{Addr: te.srvAddr}} 1572 r.InitialState(resolver.State{Addresses: addrs}) 1573 te.resolverScheme = r.Scheme() 1574 te.clientConn() 1575 defer te.tearDown() 1576 var cid int64 1577 // Here, we just wait for all sockets to be up. In the future, if we implement 1578 // IDLE, we may need to make several rpc calls to create the sockets. 1579 if err := verifyResultWithDelay(func() (bool, error) { 1580 tcs, _ := channelz.GetTopChannels(0, 0) 1581 if len(tcs) != 1 { 1582 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1583 } 1584 cid = tcs[0].ID 1585 for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- { 1586 if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") { 1587 break 1588 } 1589 if i == 0 { 1590 return false, fmt.Errorf("events do not contain expected address resolution from empty address state. Got: %+v", tcs[0].Trace.Events) 1591 } 1592 } 1593 return true, nil 1594 }); err != nil { 1595 t.Fatal(err) 1596 } 1597 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1598 1599 if err := verifyResultWithDelay(func() (bool, error) { 1600 cm := channelz.GetChannel(cid) 1601 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1602 if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) { 1603 break 1604 } 1605 if i == 0 { 1606 return false, fmt.Errorf("events do not contain expected address resolution change of LB policy") 1607 } 1608 } 1609 return true, nil 1610 }); err != nil { 1611 t.Fatal(err) 1612 } 1613 1614 newSC := parseCfg(r, `{ 1615 "methodConfig": [ 1616 { 1617 "name": [ 1618 { 1619 "service": "grpc.testing.TestService", 1620 "method": "EmptyCall" 1621 } 1622 ], 1623 "waitForReady": false, 1624 "timeout": ".001s" 1625 } 1626 ] 1627}`) 1628 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC}) 1629 1630 if err := verifyResultWithDelay(func() (bool, error) { 1631 cm := channelz.GetChannel(cid) 1632 1633 var es []string 1634 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1635 if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") { 1636 break 1637 } 1638 es = append(es, cm.Trace.Events[i].Desc) 1639 if i == 0 { 1640 return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n")) 1641 } 1642 } 1643 return true, nil 1644 }); err != nil { 1645 t.Fatal(err) 1646 } 1647 1648 r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC}) 1649 1650 if err := verifyResultWithDelay(func() (bool, error) { 1651 cm := channelz.GetChannel(cid) 1652 for i := len(cm.Trace.Events) - 1; i >= 0; i-- { 1653 if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") { 1654 break 1655 } 1656 if i == 0 { 1657 return false, fmt.Errorf("events do not contain expected address resolution of empty address") 1658 } 1659 } 1660 return true, nil 1661 }); err != nil { 1662 t.Fatal(err) 1663 } 1664} 1665 1666func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { 1667 czCleanup := channelz.NewChannelzStorage() 1668 defer czCleanupWrapper(czCleanup, t) 1669 e := tcpClearRREnv 1670 e.balancer = "" 1671 te := newTest(t, e) 1672 te.startServers(&testServer{security: e.security}, 3) 1673 r, cleanup := manual.GenerateAndRegisterManualResolver() 1674 defer cleanup() 1675 var svrAddrs []resolver.Address 1676 for _, a := range te.srvAddrs { 1677 svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) 1678 } 1679 r.InitialState(resolver.State{Addresses: svrAddrs}) 1680 te.resolverScheme = r.Scheme() 1681 cc := te.clientConn() 1682 defer te.tearDown() 1683 tc := testpb.NewTestServiceClient(cc) 1684 // make sure the connection is up 1685 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1686 defer cancel() 1687 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1688 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1689 } 1690 te.srvs[0].Stop() 1691 te.srvs[1].Stop() 1692 // Here, we just wait for all sockets to be up. In the future, if we implement 1693 // IDLE, we may need to make several rpc calls to create the sockets. 1694 if err := verifyResultWithDelay(func() (bool, error) { 1695 tcs, _ := channelz.GetTopChannels(0, 0) 1696 if len(tcs) != 1 { 1697 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1698 } 1699 if len(tcs[0].SubChans) != 1 { 1700 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1701 } 1702 var subConn int64 1703 for k := range tcs[0].SubChans { 1704 subConn = k 1705 } 1706 scm := channelz.GetSubChannel(subConn) 1707 if scm.Trace == nil { 1708 return false, fmt.Errorf("trace for SubChannel should not be empty") 1709 } 1710 if len(scm.Trace.Events) == 0 { 1711 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1712 } 1713 for i := len(scm.Trace.Events) - 1; i >= 0; i-- { 1714 if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) { 1715 break 1716 } 1717 if i == 0 { 1718 return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address") 1719 } 1720 } 1721 return true, nil 1722 }); err != nil { 1723 t.Fatal(err) 1724 } 1725} 1726 1727func (s) TestCZSubChannelConnectivityState(t *testing.T) { 1728 czCleanup := channelz.NewChannelzStorage() 1729 defer czCleanupWrapper(czCleanup, t) 1730 e := tcpClearRREnv 1731 te := newTest(t, e) 1732 te.startServer(&testServer{security: e.security}) 1733 r, cleanup := manual.GenerateAndRegisterManualResolver() 1734 defer cleanup() 1735 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1736 te.resolverScheme = r.Scheme() 1737 cc := te.clientConn() 1738 defer te.tearDown() 1739 tc := testpb.NewTestServiceClient(cc) 1740 // make sure the connection is up 1741 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1742 defer cancel() 1743 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1744 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1745 } 1746 var subConn int64 1747 te.srv.Stop() 1748 1749 if err := verifyResultWithDelay(func() (bool, error) { 1750 // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due 1751 // to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}})) 1752 if subConn == 0 { 1753 tcs, _ := channelz.GetTopChannels(0, 0) 1754 if len(tcs) != 1 { 1755 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1756 } 1757 if len(tcs[0].SubChans) != 1 { 1758 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1759 } 1760 for k := range tcs[0].SubChans { 1761 // get the SubChannel id for further trace inquiry. 1762 subConn = k 1763 } 1764 } 1765 scm := channelz.GetSubChannel(subConn) 1766 if scm == nil { 1767 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") 1768 } 1769 if scm.Trace == nil { 1770 return false, fmt.Errorf("trace for SubChannel should not be empty") 1771 } 1772 if len(scm.Trace.Events) == 0 { 1773 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") 1774 } 1775 var ready, connecting, transient, shutdown int 1776 for _, e := range scm.Trace.Events { 1777 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { 1778 transient++ 1779 } 1780 } 1781 // Make sure the SubChannel has already seen transient failure before shutting it down through 1782 // r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}). 1783 if transient == 0 { 1784 return false, fmt.Errorf("transient failure has not happened on SubChannel yet") 1785 } 1786 transient = 0 1787 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1788 for _, e := range scm.Trace.Events { 1789 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) { 1790 ready++ 1791 } 1792 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) { 1793 connecting++ 1794 } 1795 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { 1796 transient++ 1797 } 1798 if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) { 1799 shutdown++ 1800 } 1801 } 1802 // example: 1803 // Subchannel Created 1804 // Subchannel's connectivity state changed to CONNECTING 1805 // Subchannel picked a new address: "localhost:36011" 1806 // Subchannel's connectivity state changed to READY 1807 // Subchannel's connectivity state changed to TRANSIENT_FAILURE 1808 // Subchannel's connectivity state changed to CONNECTING 1809 // Subchannel picked a new address: "localhost:36011" 1810 // Subchannel's connectivity state changed to SHUTDOWN 1811 // Subchannel Deleted 1812 if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 { 1813 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown) 1814 } 1815 1816 return true, nil 1817 }); err != nil { 1818 t.Fatal(err) 1819 } 1820} 1821 1822func (s) TestCZChannelConnectivityState(t *testing.T) { 1823 czCleanup := channelz.NewChannelzStorage() 1824 defer czCleanupWrapper(czCleanup, t) 1825 e := tcpClearRREnv 1826 te := newTest(t, e) 1827 te.startServer(&testServer{security: e.security}) 1828 r, cleanup := manual.GenerateAndRegisterManualResolver() 1829 defer cleanup() 1830 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1831 te.resolverScheme = r.Scheme() 1832 cc := te.clientConn() 1833 defer te.tearDown() 1834 tc := testpb.NewTestServiceClient(cc) 1835 // make sure the connection is up 1836 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 1837 defer cancel() 1838 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1839 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1840 } 1841 te.srv.Stop() 1842 if err := verifyResultWithDelay(func() (bool, error) { 1843 tcs, _ := channelz.GetTopChannels(0, 0) 1844 if len(tcs) != 1 { 1845 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1846 } 1847 1848 var ready, connecting, transient int 1849 for _, e := range tcs[0].Trace.Events { 1850 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) { 1851 ready++ 1852 } 1853 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) { 1854 connecting++ 1855 } 1856 if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) { 1857 transient++ 1858 } 1859 } 1860 1861 // example: 1862 // Channel Created 1863 // Adressses resolved (from empty address state): "localhost:40467" 1864 // SubChannel (id: 4[]) Created 1865 // Channel's connectivity state changed to CONNECTING 1866 // Channel's connectivity state changed to READY 1867 // Channel's connectivity state changed to TRANSIENT_FAILURE 1868 // Channel's connectivity state changed to CONNECTING 1869 // Channel's connectivity state changed to TRANSIENT_FAILURE 1870 if ready != 1 || connecting < 1 || transient < 1 { 1871 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient) 1872 } 1873 return true, nil 1874 }); err != nil { 1875 t.Fatal(err) 1876 } 1877} 1878 1879func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { 1880 czCleanup := channelz.NewChannelzStorage() 1881 defer czCleanupWrapper(czCleanup, t) 1882 e := tcpClearRREnv 1883 // avoid newTest using WithBalancer, which would override service config's change of balancer below. 1884 e.balancer = "" 1885 te := newTest(t, e) 1886 channelz.SetMaxTraceEntry(1) 1887 defer channelz.ResetMaxTraceEntryToDefault() 1888 r, cleanup := manual.GenerateAndRegisterManualResolver() 1889 defer cleanup() 1890 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} 1891 r.InitialState(resolver.State{Addresses: resolvedAddrs}) 1892 te.resolverScheme = r.Scheme() 1893 te.clientConn() 1894 defer te.tearDown() 1895 var nestedConn int64 1896 if err := verifyResultWithDelay(func() (bool, error) { 1897 tcs, _ := channelz.GetTopChannels(0, 0) 1898 if len(tcs) != 1 { 1899 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1900 } 1901 if len(tcs[0].NestedChans) != 1 { 1902 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1903 } 1904 for k := range tcs[0].NestedChans { 1905 nestedConn = k 1906 } 1907 return true, nil 1908 }); err != nil { 1909 t.Fatal(err) 1910 } 1911 1912 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1913 1914 // wait for the shutdown of grpclb balancer 1915 if err := verifyResultWithDelay(func() (bool, error) { 1916 tcs, _ := channelz.GetTopChannels(0, 0) 1917 if len(tcs) != 1 { 1918 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1919 } 1920 if len(tcs[0].NestedChans) != 0 { 1921 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) 1922 } 1923 return true, nil 1924 }); err != nil { 1925 t.Fatal(err) 1926 } 1927 1928 // If nested channel deletion is last trace event before the next validation, it will fail, as the top channel will hold a reference to it. 1929 // This line forces a trace event on the top channel in that case. 1930 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) 1931 1932 // verify that the nested channel no longer exist due to trace referencing it got overwritten. 1933 if err := verifyResultWithDelay(func() (bool, error) { 1934 cm := channelz.GetChannel(nestedConn) 1935 if cm != nil { 1936 return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore") 1937 } 1938 return true, nil 1939 }); err != nil { 1940 t.Fatal(err) 1941 } 1942} 1943 1944func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { 1945 czCleanup := channelz.NewChannelzStorage() 1946 defer czCleanupWrapper(czCleanup, t) 1947 e := tcpClearRREnv 1948 te := newTest(t, e) 1949 channelz.SetMaxTraceEntry(1) 1950 defer channelz.ResetMaxTraceEntryToDefault() 1951 te.startServer(&testServer{security: e.security}) 1952 r, cleanup := manual.GenerateAndRegisterManualResolver() 1953 defer cleanup() 1954 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 1955 te.resolverScheme = r.Scheme() 1956 te.clientConn() 1957 defer te.tearDown() 1958 var subConn int64 1959 // Here, we just wait for all sockets to be up. In the future, if we implement 1960 // IDLE, we may need to make several rpc calls to create the sockets. 1961 if err := verifyResultWithDelay(func() (bool, error) { 1962 tcs, _ := channelz.GetTopChannels(0, 0) 1963 if len(tcs) != 1 { 1964 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 1965 } 1966 if len(tcs[0].SubChans) != 1 { 1967 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 1968 } 1969 for k := range tcs[0].SubChans { 1970 subConn = k 1971 } 1972 return true, nil 1973 }); err != nil { 1974 t.Fatal(err) 1975 } 1976 1977 // Wait for ready 1978 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1979 defer cancel() 1980 for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { 1981 if !te.cc.WaitForStateChange(ctx, src) { 1982 t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) 1983 } 1984 } 1985 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) 1986 // Wait for not-ready. 1987 for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { 1988 if !te.cc.WaitForStateChange(ctx, src) { 1989 t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) 1990 } 1991 } 1992 1993 // verify that the subchannel no longer exist due to trace referencing it got overwritten. 1994 if err := verifyResultWithDelay(func() (bool, error) { 1995 cm := channelz.GetChannel(subConn) 1996 if cm != nil { 1997 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") 1998 } 1999 return true, nil 2000 }); err != nil { 2001 t.Fatal(err) 2002 } 2003} 2004 2005func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { 2006 czCleanup := channelz.NewChannelzStorage() 2007 defer czCleanupWrapper(czCleanup, t) 2008 e := tcpClearRREnv 2009 te := newTest(t, e) 2010 te.startServer(&testServer{security: e.security}) 2011 r, cleanup := manual.GenerateAndRegisterManualResolver() 2012 defer cleanup() 2013 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) 2014 te.resolverScheme = r.Scheme() 2015 te.clientConn() 2016 var subConn int64 2017 // Here, we just wait for all sockets to be up. In the future, if we implement 2018 // IDLE, we may need to make several rpc calls to create the sockets. 2019 if err := verifyResultWithDelay(func() (bool, error) { 2020 tcs, _ := channelz.GetTopChannels(0, 0) 2021 if len(tcs) != 1 { 2022 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) 2023 } 2024 if len(tcs[0].SubChans) != 1 { 2025 return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) 2026 } 2027 for k := range tcs[0].SubChans { 2028 subConn = k 2029 } 2030 return true, nil 2031 }); err != nil { 2032 t.Fatal(err) 2033 } 2034 te.tearDown() 2035 // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared. 2036 if err := verifyResultWithDelay(func() (bool, error) { 2037 cm := channelz.GetChannel(subConn) 2038 if cm != nil { 2039 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") 2040 } 2041 return true, nil 2042 }); err != nil { 2043 t.Fatal(err) 2044 } 2045} 2046