1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "net" 24 "sync" 25 "testing" 26 "time" 27 28 "golang.org/x/net/http2" 29 "google.golang.org/grpc/balancer" 30 "google.golang.org/grpc/connectivity" 31 "google.golang.org/grpc/internal/testutils" 32 "google.golang.org/grpc/resolver" 33 "google.golang.org/grpc/resolver/manual" 34) 35 36const stateRecordingBalancerName = "state_recoding_balancer" 37 38var testBalancerBuilder = newStateRecordingBalancerBuilder() 39 40func init() { 41 balancer.Register(testBalancerBuilder) 42} 43 44// These tests use a pipeListener. This listener is similar to net.Listener 45// except that it is unbuffered, so each read and write will wait for the other 46// side's corresponding write or read. 47func (s) TestStateTransitions_SingleAddress(t *testing.T) { 48 for _, test := range []struct { 49 desc string 50 want []connectivity.State 51 server func(net.Listener) net.Conn 52 }{ 53 { 54 desc: "When the server returns server preface, the client enters READY.", 55 want: []connectivity.State{ 56 connectivity.Connecting, 57 connectivity.Ready, 58 }, 59 server: func(lis net.Listener) net.Conn { 60 conn, err := lis.Accept() 61 if err != nil { 62 t.Error(err) 63 return nil 64 } 65 66 go keepReading(conn) 67 68 framer := http2.NewFramer(conn, conn) 69 if err := framer.WriteSettings(http2.Setting{}); err != nil { 70 t.Errorf("Error while writing settings frame. %v", err) 71 return nil 72 } 73 74 return conn 75 }, 76 }, 77 { 78 desc: "When the connection is closed, the client enters TRANSIENT FAILURE.", 79 want: []connectivity.State{ 80 connectivity.Connecting, 81 connectivity.TransientFailure, 82 }, 83 server: func(lis net.Listener) net.Conn { 84 conn, err := lis.Accept() 85 if err != nil { 86 t.Error(err) 87 return nil 88 } 89 90 conn.Close() 91 return nil 92 }, 93 }, 94 { 95 desc: `When the server sends its connection preface, but the connection dies before the client can write its 96connection preface, the client enters TRANSIENT FAILURE.`, 97 want: []connectivity.State{ 98 connectivity.Connecting, 99 connectivity.TransientFailure, 100 }, 101 server: func(lis net.Listener) net.Conn { 102 conn, err := lis.Accept() 103 if err != nil { 104 t.Error(err) 105 return nil 106 } 107 108 framer := http2.NewFramer(conn, conn) 109 if err := framer.WriteSettings(http2.Setting{}); err != nil { 110 t.Errorf("Error while writing settings frame. %v", err) 111 return nil 112 } 113 114 conn.Close() 115 return nil 116 }, 117 }, 118 { 119 desc: `When the server reads the client connection preface but does not send its connection preface, the 120client enters TRANSIENT FAILURE.`, 121 want: []connectivity.State{ 122 connectivity.Connecting, 123 connectivity.TransientFailure, 124 }, 125 server: func(lis net.Listener) net.Conn { 126 conn, err := lis.Accept() 127 if err != nil { 128 t.Error(err) 129 return nil 130 } 131 132 go keepReading(conn) 133 134 return conn 135 }, 136 }, 137 } { 138 t.Log(test.desc) 139 testStateTransitionSingleAddress(t, test.want, test.server) 140 } 141} 142 143func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { 144 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 145 defer cancel() 146 147 pl := testutils.NewPipeListener() 148 defer pl.Close() 149 150 // Launch the server. 151 var conn net.Conn 152 var connMu sync.Mutex 153 go func() { 154 connMu.Lock() 155 conn = server(pl) 156 connMu.Unlock() 157 }() 158 159 client, err := DialContext(ctx, 160 "", 161 WithInsecure(), 162 WithBalancerName(stateRecordingBalancerName), 163 WithDialer(pl.Dialer()), 164 withBackoff(noBackoff{}), 165 withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 })) 166 if err != nil { 167 t.Fatal(err) 168 } 169 defer client.Close() 170 171 stateNotifications := testBalancerBuilder.nextStateNotifier() 172 173 timeout := time.After(5 * time.Second) 174 175 for i := 0; i < len(want); i++ { 176 select { 177 case <-timeout: 178 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) 179 case seen := <-stateNotifications: 180 if seen != want[i] { 181 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) 182 } 183 } 184 } 185 186 connMu.Lock() 187 defer connMu.Unlock() 188 if conn != nil { 189 err = conn.Close() 190 if err != nil { 191 t.Fatal(err) 192 } 193 } 194} 195 196// When a READY connection is closed, the client enters CONNECTING. 197func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { 198 want := []connectivity.State{ 199 connectivity.Connecting, 200 connectivity.Ready, 201 connectivity.Connecting, 202 } 203 204 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 205 defer cancel() 206 207 lis, err := net.Listen("tcp", "localhost:0") 208 if err != nil { 209 t.Fatalf("Error while listening. Err: %v", err) 210 } 211 defer lis.Close() 212 213 sawReady := make(chan struct{}) 214 215 // Launch the server. 216 go func() { 217 conn, err := lis.Accept() 218 if err != nil { 219 t.Error(err) 220 return 221 } 222 223 go keepReading(conn) 224 225 framer := http2.NewFramer(conn, conn) 226 if err := framer.WriteSettings(http2.Setting{}); err != nil { 227 t.Errorf("Error while writing settings frame. %v", err) 228 return 229 } 230 231 // Prevents race between onPrefaceReceipt and onClose. 232 <-sawReady 233 234 conn.Close() 235 }() 236 237 client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName)) 238 if err != nil { 239 t.Fatal(err) 240 } 241 defer client.Close() 242 243 stateNotifications := testBalancerBuilder.nextStateNotifier() 244 245 timeout := time.After(5 * time.Second) 246 247 for i := 0; i < len(want); i++ { 248 select { 249 case <-timeout: 250 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) 251 case seen := <-stateNotifications: 252 if seen == connectivity.Ready { 253 close(sawReady) 254 } 255 if seen != want[i] { 256 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) 257 } 258 } 259 } 260} 261 262// When the first connection is closed, the client stays in CONNECTING until it 263// tries the second address (which succeeds, and then it enters READY). 264func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) { 265 want := []connectivity.State{ 266 connectivity.Connecting, 267 connectivity.Ready, 268 } 269 270 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 271 defer cancel() 272 273 lis1, err := net.Listen("tcp", "localhost:0") 274 if err != nil { 275 t.Fatalf("Error while listening. Err: %v", err) 276 } 277 defer lis1.Close() 278 279 lis2, err := net.Listen("tcp", "localhost:0") 280 if err != nil { 281 t.Fatalf("Error while listening. Err: %v", err) 282 } 283 defer lis2.Close() 284 285 server1Done := make(chan struct{}) 286 server2Done := make(chan struct{}) 287 288 // Launch server 1. 289 go func() { 290 conn, err := lis1.Accept() 291 if err != nil { 292 t.Error(err) 293 return 294 } 295 296 conn.Close() 297 close(server1Done) 298 }() 299 // Launch server 2. 300 go func() { 301 conn, err := lis2.Accept() 302 if err != nil { 303 t.Error(err) 304 return 305 } 306 307 go keepReading(conn) 308 309 framer := http2.NewFramer(conn, conn) 310 if err := framer.WriteSettings(http2.Setting{}); err != nil { 311 t.Errorf("Error while writing settings frame. %v", err) 312 return 313 } 314 315 close(server2Done) 316 }() 317 318 rb := manual.NewBuilderWithScheme("whatever") 319 rb.InitialState(resolver.State{Addresses: []resolver.Address{ 320 {Addr: lis1.Addr().String()}, 321 {Addr: lis2.Addr().String()}, 322 }}) 323 client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) 324 if err != nil { 325 t.Fatal(err) 326 } 327 defer client.Close() 328 329 stateNotifications := testBalancerBuilder.nextStateNotifier() 330 331 timeout := time.After(5 * time.Second) 332 333 for i := 0; i < len(want); i++ { 334 select { 335 case <-timeout: 336 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) 337 case seen := <-stateNotifications: 338 if seen != want[i] { 339 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) 340 } 341 } 342 } 343 select { 344 case <-timeout: 345 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") 346 case <-server1Done: 347 } 348 select { 349 case <-timeout: 350 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2") 351 case <-server2Done: 352 } 353} 354 355// When there are multiple addresses, and we enter READY on one of them, a 356// later closure should cause the client to enter CONNECTING 357func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { 358 want := []connectivity.State{ 359 connectivity.Connecting, 360 connectivity.Ready, 361 connectivity.Connecting, 362 } 363 364 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 365 defer cancel() 366 367 lis1, err := net.Listen("tcp", "localhost:0") 368 if err != nil { 369 t.Fatalf("Error while listening. Err: %v", err) 370 } 371 defer lis1.Close() 372 373 // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target. 374 lis2, err := net.Listen("tcp", "localhost:0") 375 if err != nil { 376 t.Fatalf("Error while listening. Err: %v", err) 377 } 378 defer lis2.Close() 379 380 server1Done := make(chan struct{}) 381 sawReady := make(chan struct{}) 382 383 // Launch server 1. 384 go func() { 385 conn, err := lis1.Accept() 386 if err != nil { 387 t.Error(err) 388 return 389 } 390 391 go keepReading(conn) 392 393 framer := http2.NewFramer(conn, conn) 394 if err := framer.WriteSettings(http2.Setting{}); err != nil { 395 t.Errorf("Error while writing settings frame. %v", err) 396 return 397 } 398 399 <-sawReady 400 401 conn.Close() 402 403 _, err = lis1.Accept() 404 if err != nil { 405 t.Error(err) 406 return 407 } 408 409 close(server1Done) 410 }() 411 412 rb := manual.NewBuilderWithScheme("whatever") 413 rb.InitialState(resolver.State{Addresses: []resolver.Address{ 414 {Addr: lis1.Addr().String()}, 415 {Addr: lis2.Addr().String()}, 416 }}) 417 client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) 418 if err != nil { 419 t.Fatal(err) 420 } 421 defer client.Close() 422 423 stateNotifications := testBalancerBuilder.nextStateNotifier() 424 425 timeout := time.After(2 * time.Second) 426 427 for i := 0; i < len(want); i++ { 428 select { 429 case <-timeout: 430 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) 431 case seen := <-stateNotifications: 432 if seen == connectivity.Ready { 433 close(sawReady) 434 } 435 if seen != want[i] { 436 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) 437 } 438 } 439 } 440 select { 441 case <-timeout: 442 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") 443 case <-server1Done: 444 } 445} 446 447type stateRecordingBalancer struct { 448 notifier chan<- connectivity.State 449 balancer.Balancer 450} 451 452func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { 453 b.notifier <- s.ConnectivityState 454 b.Balancer.UpdateSubConnState(sc, s) 455} 456 457func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { 458 b.notifier = r 459} 460 461func (b *stateRecordingBalancer) Close() { 462 b.Balancer.Close() 463} 464 465type stateRecordingBalancerBuilder struct { 466 mu sync.Mutex 467 notifier chan connectivity.State // The notifier used in the last Balancer. 468} 469 470func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder { 471 return &stateRecordingBalancerBuilder{} 472} 473 474func (b *stateRecordingBalancerBuilder) Name() string { 475 return stateRecordingBalancerName 476} 477 478func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 479 stateNotifications := make(chan connectivity.State, 10) 480 b.mu.Lock() 481 b.notifier = stateNotifications 482 b.mu.Unlock() 483 return &stateRecordingBalancer{ 484 notifier: stateNotifications, 485 Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts), 486 } 487} 488 489func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State { 490 b.mu.Lock() 491 defer b.mu.Unlock() 492 ret := b.notifier 493 b.notifier = nil 494 return ret 495} 496 497type noBackoff struct{} 498 499func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) } 500 501// Keep reading until something causes the connection to die (EOF, server 502// closed, etc). Useful as a tool for mindlessly keeping the connection 503// healthy, since the client will error if things like client prefaces are not 504// accepted in a timely fashion. 505func keepReading(conn net.Conn) { 506 buf := make([]byte, 1024) 507 for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) { 508 } 509} 510