1package consul 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "crypto/x509" 8 "encoding/binary" 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "math" 14 "net" 15 "os" 16 "path/filepath" 17 "strings" 18 "sync" 19 "testing" 20 "time" 21 22 "github.com/hashicorp/go-hclog" 23 "github.com/hashicorp/go-memdb" 24 "github.com/hashicorp/go-msgpack/codec" 25 msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" 26 "github.com/hashicorp/raft" 27 "github.com/stretchr/testify/assert" 28 "github.com/stretchr/testify/require" 29 "google.golang.org/grpc" 30 31 "github.com/hashicorp/consul/acl" 32 "github.com/hashicorp/consul/agent/connect" 33 "github.com/hashicorp/consul/agent/consul/state" 34 agent_grpc "github.com/hashicorp/consul/agent/grpc" 35 "github.com/hashicorp/consul/agent/pool" 36 "github.com/hashicorp/consul/agent/structs" 37 tokenStore "github.com/hashicorp/consul/agent/token" 38 "github.com/hashicorp/consul/api" 39 "github.com/hashicorp/consul/proto/pbsubscribe" 40 "github.com/hashicorp/consul/sdk/testutil" 41 "github.com/hashicorp/consul/sdk/testutil/retry" 42 "github.com/hashicorp/consul/testrpc" 43 "github.com/hashicorp/consul/tlsutil" 44) 45 46func TestRPC_NoLeader_Fail(t *testing.T) { 47 if testing.Short() { 48 t.Skip("too slow for testing.Short") 49 } 50 51 t.Parallel() 52 dir1, s1 := testServerWithConfig(t, func(c *Config) { 53 c.RPCHoldTimeout = 1 * time.Millisecond 54 }) 55 defer os.RemoveAll(dir1) 56 defer s1.Shutdown() 57 codec := rpcClient(t, s1) 58 defer codec.Close() 59 60 arg := structs.RegisterRequest{ 61 Datacenter: "dc1", 62 Node: "foo", 63 Address: "127.0.0.1", 64 } 65 var out struct{} 66 67 // Make sure we eventually fail with a no leader error, which we should 68 // see given the short timeout. 69 err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) 70 if err == nil || err.Error() != structs.ErrNoLeader.Error() { 71 t.Fatalf("bad: %v", err) 72 } 73 74 // Now make sure it goes through. 75 testrpc.WaitForTestAgent(t, s1.RPC, "dc1") 76 err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) 77 if err != nil { 78 t.Fatalf("bad: %v", err) 79 } 80} 81 82func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) { 83 if testing.Short() { 84 t.Skip("too slow for testing.Short") 85 } 86 87 t.Parallel() 88 dir1, s1 := testServerWithConfig(t, func(c *Config) { 89 c.RPCHoldTimeout = 1 * time.Millisecond 90 }) 91 defer os.RemoveAll(dir1) 92 defer s1.Shutdown() 93 codec := rpcClient(t, s1) 94 defer codec.Close() 95 96 arg := structs.RegisterRequest{ 97 Datacenter: "dc1", 98 Node: "foo", 99 Address: "127.0.0.1", 100 } 101 var out struct{} 102 103 // Make sure we eventually fail with a no leader error, which we should 104 // see given the short timeout. 105 err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) 106 if err == nil || err.Error() != structs.ErrNoLeader.Error() { 107 t.Fatalf("bad: %v", err) 108 } 109 110 // Until leader has never been known, stale should fail 111 getKeysReq := structs.KeyListRequest{ 112 Datacenter: "dc1", 113 Prefix: "", 114 Seperator: "/", 115 QueryOptions: structs.QueryOptions{AllowStale: true}, 116 } 117 var keyList structs.IndexedKeyList 118 if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() { 119 t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err) 120 } 121 122 testrpc.WaitForTestAgent(t, s1.RPC, "dc1") 123 if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil { 124 t.Fatalf("Did not expect any error but got err: %v", err) 125 } 126} 127 128func TestRPC_NoLeader_Retry(t *testing.T) { 129 if testing.Short() { 130 t.Skip("too slow for testing.Short") 131 } 132 133 t.Parallel() 134 dir1, s1 := testServerWithConfig(t, func(c *Config) { 135 c.RPCHoldTimeout = 10 * time.Second 136 }) 137 defer os.RemoveAll(dir1) 138 defer s1.Shutdown() 139 codec := rpcClient(t, s1) 140 defer codec.Close() 141 142 arg := structs.RegisterRequest{ 143 Datacenter: "dc1", 144 Node: "foo", 145 Address: "127.0.0.1", 146 } 147 var out struct{} 148 149 // This isn't sure-fire but tries to check that we don't have a 150 // leader going into the RPC, so we exercise the retry logic. 151 if ok, _, _ := s1.getLeader(); ok { 152 t.Fatalf("should not have a leader yet") 153 } 154 155 // The timeout is long enough to ride out any reasonable leader 156 // election. 157 err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) 158 if err != nil { 159 t.Fatalf("bad: %v", err) 160 } 161} 162 163func TestRPC_getLeader_ErrLeaderNotTracked(t *testing.T) { 164 if testing.Short() { 165 t.Skip("too slow for testing.Short") 166 } 167 168 cluster := newTestCluster(t, &testClusterConfig{ 169 Datacenter: "dc1", 170 Servers: 3, 171 ServerWait: func(t *testing.T, srv *Server) { 172 // The test cluster waits for a leader to be established 173 // but not for all the RPC tracking of all servers to be updated 174 // so we also want to wait for that here 175 retry.Run(t, func(r *retry.R) { 176 if !srv.IsLeader() { 177 _, _, err := srv.getLeader() 178 require.NoError(r, err) 179 } 180 }) 181 182 }, 183 }) 184 185 // At this point we know we have a cluster with a leader and all followers are tracking that 186 // leader in the serverLookup struct. We need to find a follower to hack its server lookup 187 // to force the error we desire 188 189 var follower *Server 190 for _, srv := range cluster.Servers { 191 if !srv.IsLeader() { 192 follower = srv 193 break 194 } 195 } 196 197 _, leaderMeta, err := follower.getLeader() 198 require.NoError(t, err) 199 200 // now do some behind the scenes trickery on the followers server lookup 201 // to remove the leader from it so that we can force a ErrLeaderNotTracked error 202 follower.serverLookup.RemoveServer(leaderMeta) 203 204 isLeader, meta, err := follower.getLeader() 205 require.Error(t, err) 206 require.True(t, errors.Is(err, structs.ErrLeaderNotTracked)) 207 require.Nil(t, meta) 208 require.False(t, isLeader) 209} 210 211type MockSink struct { 212 *bytes.Buffer 213 cancel bool 214} 215 216func (m *MockSink) ID() string { 217 return "Mock" 218} 219 220func (m *MockSink) Cancel() error { 221 m.cancel = true 222 return nil 223} 224 225func (m *MockSink) Close() error { 226 return nil 227} 228 229func TestRPC_blockingQuery(t *testing.T) { 230 t.Parallel() 231 dir, s := testServer(t) 232 defer os.RemoveAll(dir) 233 defer s.Shutdown() 234 235 require := require.New(t) 236 assert := assert.New(t) 237 238 // Perform a non-blocking query. Note that it's significant that the meta has 239 // a zero index in response - the implied opts.MinQueryIndex is also zero but 240 // this should not block still. 241 { 242 var opts structs.QueryOptions 243 var meta structs.QueryMeta 244 var calls int 245 fn := func(_ memdb.WatchSet, _ *state.Store) error { 246 calls++ 247 return nil 248 } 249 if err := s.blockingQuery(&opts, &meta, fn); err != nil { 250 t.Fatalf("err: %v", err) 251 } 252 if calls != 1 { 253 t.Fatalf("bad: %d", calls) 254 } 255 } 256 257 // Perform a blocking query that gets woken up and loops around once. 258 { 259 opts := structs.QueryOptions{ 260 MinQueryIndex: 3, 261 } 262 var meta structs.QueryMeta 263 var calls int 264 fn := func(ws memdb.WatchSet, _ *state.Store) error { 265 if calls == 0 { 266 meta.Index = 3 267 268 fakeCh := make(chan struct{}) 269 close(fakeCh) 270 ws.Add(fakeCh) 271 } else { 272 meta.Index = 4 273 } 274 calls++ 275 return nil 276 } 277 if err := s.blockingQuery(&opts, &meta, fn); err != nil { 278 t.Fatalf("err: %v", err) 279 } 280 if calls != 2 { 281 t.Fatalf("bad: %d", calls) 282 } 283 } 284 285 // Perform a blocking query that returns a zero index from blocking func (e.g. 286 // no state yet). This should still return an empty response immediately, but 287 // with index of 1 and then block on the next attempt. In one sense zero index 288 // is not really a valid response from a state method that is not an error but 289 // in practice a lot of state store operations do return it unless they 290 // explicitly special checks to turn 0 into 1. Often this is not caught or 291 // covered by tests but eventually when hit in the wild causes blocking 292 // clients to busy loop and burn CPU. This test ensure that blockingQuery 293 // systematically does the right thing to prevent future bugs like that. 294 { 295 opts := structs.QueryOptions{ 296 MinQueryIndex: 0, 297 } 298 var meta structs.QueryMeta 299 var calls int 300 fn := func(ws memdb.WatchSet, _ *state.Store) error { 301 if opts.MinQueryIndex > 0 { 302 // If client requested blocking, block forever. This is simulating 303 // waiting for the watched resource to be initialized/written to giving 304 // it a non-zero index. Note the timeout on the query options is relied 305 // on to stop the test taking forever. 306 fakeCh := make(chan struct{}) 307 ws.Add(fakeCh) 308 } 309 meta.Index = 0 310 calls++ 311 return nil 312 } 313 require.NoError(s.blockingQuery(&opts, &meta, fn)) 314 assert.Equal(1, calls) 315 assert.Equal(uint64(1), meta.Index, 316 "expect fake index of 1 to force client to block on next update") 317 318 // Simulate client making next request 319 opts.MinQueryIndex = 1 320 opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long 321 322 // This time we should block even though the func returns index 0 still 323 t0 := time.Now() 324 require.NoError(s.blockingQuery(&opts, &meta, fn)) 325 t1 := time.Now() 326 assert.Equal(2, calls) 327 assert.Equal(uint64(1), meta.Index, 328 "expect fake index of 1 to force client to block on next update") 329 assert.True(t1.Sub(t0) > 20*time.Millisecond, 330 "should have actually blocked waiting for timeout") 331 332 } 333 334 // Perform a query that blocks and gets interrupted when the state store 335 // is abandoned. 336 { 337 opts := structs.QueryOptions{ 338 MinQueryIndex: 3, 339 } 340 var meta structs.QueryMeta 341 var calls int 342 fn := func(_ memdb.WatchSet, _ *state.Store) error { 343 if calls == 0 { 344 meta.Index = 3 345 346 snap, err := s.fsm.Snapshot() 347 if err != nil { 348 t.Fatalf("err: %v", err) 349 } 350 defer snap.Release() 351 352 buf := bytes.NewBuffer(nil) 353 sink := &MockSink{buf, false} 354 if err := snap.Persist(sink); err != nil { 355 t.Fatalf("err: %v", err) 356 } 357 358 if err := s.fsm.Restore(sink); err != nil { 359 t.Fatalf("err: %v", err) 360 } 361 } 362 calls++ 363 return nil 364 } 365 if err := s.blockingQuery(&opts, &meta, fn); err != nil { 366 t.Fatalf("err: %v", err) 367 } 368 if calls != 1 { 369 t.Fatalf("bad: %d", calls) 370 } 371 } 372} 373 374func TestRPC_ReadyForConsistentReads(t *testing.T) { 375 if testing.Short() { 376 t.Skip("too slow for testing.Short") 377 } 378 379 t.Parallel() 380 dir, s := testServerWithConfig(t, func(c *Config) { 381 c.RPCHoldTimeout = 2 * time.Millisecond 382 }) 383 defer os.RemoveAll(dir) 384 defer s.Shutdown() 385 386 testrpc.WaitForLeader(t, s.RPC, "dc1") 387 388 if !s.isReadyForConsistentReads() { 389 t.Fatal("Server should be ready for consistent reads") 390 } 391 392 s.resetConsistentReadReady() 393 err := s.consistentRead() 394 if err.Error() != "Not ready to serve consistent reads" { 395 t.Fatal("Server should NOT be ready for consistent reads") 396 } 397 398 go func() { 399 time.Sleep(100 * time.Millisecond) 400 s.setConsistentReadReady() 401 }() 402 403 retry.Run(t, func(r *retry.R) { 404 if err := s.consistentRead(); err != nil { 405 r.Fatalf("Expected server to be ready for consistent reads, got error %v", err) 406 } 407 }) 408} 409 410func TestRPC_MagicByteTimeout(t *testing.T) { 411 if testing.Short() { 412 t.Skip("too slow for testing.Short") 413 } 414 415 t.Parallel() 416 dir1, s1 := testServerWithConfig(t, func(c *Config) { 417 c.RPCHandshakeTimeout = 10 * time.Millisecond 418 }) 419 defer os.RemoveAll(dir1) 420 defer s1.Shutdown() 421 422 // Connect to the server with bare TCP to simulate a malicious client trying 423 // to hold open resources. 424 addr := s1.config.RPCAdvertise 425 conn, err := net.DialTimeout("tcp", addr.String(), time.Second) 426 require.NoError(t, err) 427 defer conn.Close() 428 429 // Wait for more than the timeout. This is timing dependent so could fail if 430 // the CPU is super overloaded so the handler goroutine so I'm using a retry 431 // loop below to be sure but this feels like a pretty generous margin for 432 // error (10x the timeout and 100ms of scheduling time). 433 time.Sleep(100 * time.Millisecond) 434 435 // Set a read deadline on the Conn in case the timeout is not working we don't 436 // want the read below to block forever. Needs to be much longer than what we 437 // expect and the error should be different too. 438 conn.SetReadDeadline(time.Now().Add(3 * time.Second)) 439 440 retry.Run(t, func(r *retry.R) { 441 // Sanity check the conn was closed by attempting to read from it (a write 442 // might not detect the close). 443 buf := make([]byte, 10) 444 _, err = conn.Read(buf) 445 require.Error(r, err) 446 require.Contains(r, err.Error(), "EOF") 447 }) 448} 449 450func TestRPC_TLSHandshakeTimeout(t *testing.T) { 451 if testing.Short() { 452 t.Skip("too slow for testing.Short") 453 } 454 455 t.Parallel() 456 457 dir1, s1 := testServerWithConfig(t, func(c *Config) { 458 c.RPCHandshakeTimeout = 10 * time.Millisecond 459 c.UseTLS = true 460 c.CAFile = "../../test/hostname/CertAuth.crt" 461 c.CertFile = "../../test/hostname/Alice.crt" 462 c.KeyFile = "../../test/hostname/Alice.key" 463 c.VerifyServerHostname = true 464 c.VerifyOutgoing = true 465 c.VerifyIncoming = true 466 }) 467 defer os.RemoveAll(dir1) 468 defer s1.Shutdown() 469 470 // Connect to the server with TLS magic byte delivered on time 471 addr := s1.config.RPCAdvertise 472 conn, err := net.DialTimeout("tcp", addr.String(), time.Second) 473 require.NoError(t, err) 474 defer conn.Close() 475 476 // Write TLS byte to avoid being closed by either the (outer) first byte 477 // timeout or the fact that server requires TLS 478 _, err = conn.Write([]byte{byte(pool.RPCTLS)}) 479 require.NoError(t, err) 480 481 // Wait for more than the timeout before we start a TLS handshake. This is 482 // timing dependent so could fail if the CPU is super overloaded so the 483 // handler goroutine so I'm using a retry loop below to be sure but this feels 484 // like a pretty generous margin for error (10x the timeout and 100ms of 485 // scheduling time). 486 time.Sleep(100 * time.Millisecond) 487 488 // Set a read deadline on the Conn in case the timeout is not working we don't 489 // want the read below to block forever. Needs to be much longer than what we 490 // expect and the error should be different too. 491 conn.SetReadDeadline(time.Now().Add(3 * time.Second)) 492 493 retry.Run(t, func(r *retry.R) { 494 // Sanity check the conn was closed by attempting to read from it (a write 495 // might not detect the close). 496 buf := make([]byte, 10) 497 _, err = conn.Read(buf) 498 require.Error(r, err) 499 require.Contains(r, err.Error(), "EOF") 500 }) 501} 502 503func TestRPC_PreventsTLSNesting(t *testing.T) { 504 if testing.Short() { 505 t.Skip("too slow for testing.Short") 506 } 507 508 t.Parallel() 509 510 cases := []struct { 511 name string 512 outerByte pool.RPCType 513 innerByte pool.RPCType 514 wantClose bool 515 }{ 516 { 517 // Base case, sanity check normal RPC in TLS works 518 name: "RPC in TLS", 519 outerByte: pool.RPCTLS, 520 innerByte: pool.RPCConsul, 521 wantClose: false, 522 }, 523 { 524 // Nested TLS-in-TLS 525 name: "TLS in TLS", 526 outerByte: pool.RPCTLS, 527 innerByte: pool.RPCTLS, 528 wantClose: true, 529 }, 530 { 531 // Nested TLS-in-TLS 532 name: "TLS in Insecure TLS", 533 outerByte: pool.RPCTLSInsecure, 534 innerByte: pool.RPCTLS, 535 wantClose: true, 536 }, 537 { 538 // Nested TLS-in-TLS 539 name: "Insecure TLS in TLS", 540 outerByte: pool.RPCTLS, 541 innerByte: pool.RPCTLSInsecure, 542 wantClose: true, 543 }, 544 { 545 // Nested TLS-in-TLS 546 name: "Insecure TLS in Insecure TLS", 547 outerByte: pool.RPCTLSInsecure, 548 innerByte: pool.RPCTLSInsecure, 549 wantClose: true, 550 }, 551 } 552 553 for _, tc := range cases { 554 t.Run(tc.name, func(t *testing.T) { 555 dir1, s1 := testServerWithConfig(t, func(c *Config) { 556 c.UseTLS = true 557 c.CAFile = "../../test/hostname/CertAuth.crt" 558 c.CertFile = "../../test/hostname/Alice.crt" 559 c.KeyFile = "../../test/hostname/Alice.key" 560 c.VerifyServerHostname = true 561 c.VerifyOutgoing = true 562 c.VerifyIncoming = false // saves us getting client cert setup 563 c.Domain = "consul" 564 }) 565 defer os.RemoveAll(dir1) 566 defer s1.Shutdown() 567 568 // Connect to the server with TLS magic byte delivered on time 569 addr := s1.config.RPCAdvertise 570 conn, err := net.DialTimeout("tcp", addr.String(), time.Second) 571 require.NoError(t, err) 572 defer conn.Close() 573 574 // Write Outer magic byte 575 _, err = conn.Write([]byte{byte(tc.outerByte)}) 576 require.NoError(t, err) 577 578 // Start tls client 579 tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper() 580 tlsConn, err := tlsWrap("dc1", conn) 581 require.NoError(t, err) 582 583 // Write Inner magic byte 584 _, err = tlsConn.Write([]byte{byte(tc.innerByte)}) 585 require.NoError(t, err) 586 587 if tc.wantClose { 588 // Allow up to a second for a read failure to indicate conn was closed by 589 // server. 590 conn.SetReadDeadline(time.Now().Add(1 * time.Second)) 591 592 retry.Run(t, func(r *retry.R) { 593 // Sanity check the conn was closed by attempting to read from it (a 594 // write might not detect the close). 595 buf := make([]byte, 10) 596 _, err = tlsConn.Read(buf) 597 require.Error(r, err) 598 require.Contains(r, err.Error(), "EOF") 599 }) 600 } else { 601 // Set a shorter read deadline that should typically be enough to detect 602 // immediate close but will also not make test hang forever. This 603 // positive case is mostly just a sanity check that the test code here 604 // is actually not failing just due to some other error in the way we 605 // setup TLS. It also sanity checks that we still allow valid TLS conns 606 // but if it produces possible false-positives in CI sometimes that's 607 // not such a huge deal - CI won't be brittle and it will have done it's 608 // job as a sanity check most of the time. 609 conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) 610 buf := make([]byte, 10) 611 _, err = tlsConn.Read(buf) 612 require.Error(t, err) 613 require.Contains(t, err.Error(), "i/o timeout") 614 } 615 }) 616 } 617} 618 619func connectClient(t *testing.T, s1 *Server, mb pool.RPCType, useTLS, wantOpen bool, message string) net.Conn { 620 t.Helper() 621 622 addr := s1.config.RPCAdvertise 623 tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper() 624 625 conn, err := net.DialTimeout("tcp", addr.String(), time.Second) 626 require.NoError(t, err) 627 628 // Write magic byte so we aren't timed out 629 outerByte := mb 630 if useTLS { 631 outerByte = pool.RPCTLS 632 } 633 _, err = conn.Write([]byte{byte(outerByte)}) 634 require.NoError(t, err) 635 636 if useTLS { 637 tlsConn, err := tlsWrap(s1.config.Datacenter, conn) 638 // Subtly, tlsWrap will NOT actually do a handshake in this case - it only 639 // does so for some configs, so even if the server closed the conn before 640 // handshake this won't fail and it's only when we attempt to read or write 641 // that we'll see the broken pipe. 642 require.NoError(t, err, "%s: wanted open conn, failed TLS handshake: %s", 643 message, err) 644 conn = tlsConn 645 646 // Write Inner magic byte 647 _, err = conn.Write([]byte{byte(mb)}) 648 if !wantOpen { 649 // TLS Handshake will be done on this attempt to write and should fail 650 require.Error(t, err, "%s: wanted closed conn, TLS Handshake succeeded", message) 651 } else { 652 require.NoError(t, err, "%s: wanted open conn, failed writing inner magic byte: %s", 653 message, err) 654 } 655 } 656 657 // Check if the conn is in the state we want. 658 retry.Run(t, func(r *retry.R) { 659 // Don't wait around as server won't be sending data but the read will fail 660 // immediately if the conn is closed. 661 conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond)) 662 buf := make([]byte, 10) 663 _, err := conn.Read(buf) 664 require.Error(r, err) 665 if wantOpen { 666 require.Contains(r, err.Error(), "i/o timeout", 667 "%s: wanted an open conn (read timeout)", message) 668 } else { 669 if useTLS { 670 require.Error(r, err) 671 // TLS may fail during either read or write of the handshake so there 672 // are a few different errors that come up. 673 if !strings.Contains(err.Error(), "read: connection reset by peer") && 674 !strings.Contains(err.Error(), "write: connection reset by peer") && 675 !strings.Contains(err.Error(), "write: broken pipe") { 676 r.Fatalf("%s: wanted closed conn got err: %s", message, err) 677 } 678 } else { 679 require.Contains(r, err.Error(), "EOF", "%s: wanted a closed conn", 680 message) 681 } 682 } 683 }) 684 685 return conn 686} 687 688func TestRPC_RPCMaxConnsPerClient(t *testing.T) { 689 if testing.Short() { 690 t.Skip("too slow for testing.Short") 691 } 692 693 t.Parallel() 694 695 cases := []struct { 696 name string 697 magicByte pool.RPCType 698 tlsEnabled bool 699 }{ 700 {"RPC v2", pool.RPCMultiplexV2, false}, 701 {"RPC v2 TLS", pool.RPCMultiplexV2, true}, 702 {"RPC", pool.RPCConsul, false}, 703 {"RPC TLS", pool.RPCConsul, true}, 704 } 705 706 for _, tc := range cases { 707 tc := tc 708 t.Run(tc.name, func(t *testing.T) { 709 dir1, s1 := testServerWithConfig(t, func(c *Config) { 710 c.RPCMaxConnsPerClient = 2 711 if tc.tlsEnabled { 712 c.UseTLS = true 713 c.CAFile = "../../test/hostname/CertAuth.crt" 714 c.CertFile = "../../test/hostname/Alice.crt" 715 c.KeyFile = "../../test/hostname/Alice.key" 716 c.VerifyServerHostname = true 717 c.VerifyOutgoing = true 718 c.VerifyIncoming = false // saves us getting client cert setup 719 c.Domain = "consul" 720 } 721 }) 722 defer os.RemoveAll(dir1) 723 defer s1.Shutdown() 724 725 // Connect to the server with bare TCP 726 conn1 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn1") 727 defer conn1.Close() 728 729 // Two conns should succeed 730 conn2 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn2") 731 defer conn2.Close() 732 733 // Third should be closed byt the limiter 734 conn3 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, false, "conn3") 735 defer conn3.Close() 736 737 // If we close one of the earlier ones, we should be able to open another 738 addr := conn1.RemoteAddr() 739 conn1.Close() 740 retry.Run(t, func(r *retry.R) { 741 if n := s1.rpcConnLimiter.NumOpen(addr); n >= 2 { 742 r.Fatal("waiting for open conns to drop") 743 } 744 }) 745 conn4 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn4") 746 defer conn4.Close() 747 748 // Reload config with higher limit 749 rc := ReloadableConfig{ 750 RPCRateLimit: s1.config.RPCRateLimit, 751 RPCMaxBurst: s1.config.RPCMaxBurst, 752 RPCMaxConnsPerClient: 10, 753 } 754 require.NoError(t, s1.ReloadConfig(rc)) 755 756 // Now another conn should be allowed 757 conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5") 758 defer conn5.Close() 759 }) 760 } 761} 762 763func TestRPC_readUint32(t *testing.T) { 764 cases := []struct { 765 name string 766 writeFn func(net.Conn) 767 readFn func(*testing.T, net.Conn) 768 }{ 769 { 770 name: "timeouts irrelevant", 771 writeFn: func(conn net.Conn) { 772 _ = binary.Write(conn, binary.BigEndian, uint32(42)) 773 _ = binary.Write(conn, binary.BigEndian, uint32(math.MaxUint32)) 774 _ = binary.Write(conn, binary.BigEndian, uint32(1)) 775 }, 776 readFn: func(t *testing.T, conn net.Conn) { 777 t.Helper() 778 v, err := readUint32(conn, 5*time.Second) 779 require.NoError(t, err) 780 require.Equal(t, uint32(42), v) 781 782 v, err = readUint32(conn, 5*time.Second) 783 require.NoError(t, err) 784 require.Equal(t, uint32(math.MaxUint32), v) 785 786 v, err = readUint32(conn, 5*time.Second) 787 require.NoError(t, err) 788 require.Equal(t, uint32(1), v) 789 }, 790 }, 791 { 792 name: "triggers timeout on last read", 793 writeFn: func(conn net.Conn) { 794 _ = binary.Write(conn, binary.BigEndian, uint32(42)) 795 _ = binary.Write(conn, binary.BigEndian, uint32(math.MaxUint32)) 796 _ = binary.Write(conn, binary.BigEndian, uint16(1)) // half as many bytes as expected 797 }, 798 readFn: func(t *testing.T, conn net.Conn) { 799 t.Helper() 800 v, err := readUint32(conn, 5*time.Second) 801 require.NoError(t, err) 802 require.Equal(t, uint32(42), v) 803 804 v, err = readUint32(conn, 5*time.Second) 805 require.NoError(t, err) 806 require.Equal(t, uint32(math.MaxUint32), v) 807 808 _, err = readUint32(conn, 50*time.Millisecond) 809 require.Error(t, err) 810 nerr, ok := err.(net.Error) 811 require.True(t, ok) 812 require.True(t, nerr.Timeout()) 813 }, 814 }, 815 } 816 817 for _, tc := range cases { 818 tc := tc 819 t.Run(tc.name, func(t *testing.T) { 820 var doneWg sync.WaitGroup 821 defer doneWg.Wait() 822 823 client, server := net.Pipe() 824 defer client.Close() 825 defer server.Close() 826 827 // Client pushes some data. 828 doneWg.Add(1) 829 go func() { 830 doneWg.Done() 831 tc.writeFn(client) 832 }() 833 834 // The server tests the function for us. 835 tc.readFn(t, server) 836 }) 837 } 838} 839 840func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { 841 if testing.Short() { 842 t.Skip("too slow for testing.Short") 843 } 844 845 t.Parallel() 846 dir1, s1 := testServerWithConfig(t, func(c *Config) { 847 c.PrimaryDatacenter = "dc1" 848 c.ACLsEnabled = true 849 c.ACLDefaultPolicy = "deny" 850 c.ACLMasterToken = "root" 851 }) 852 defer os.RemoveAll(dir1) 853 defer s1.Shutdown() 854 testrpc.WaitForLeader(t, s1.RPC, "dc1") 855 codec := rpcClient(t, s1) 856 defer codec.Close() 857 858 dir2, s2 := testServerWithConfig(t, func(c *Config) { 859 c.Datacenter = "dc2" 860 c.PrimaryDatacenter = "dc1" 861 c.ACLsEnabled = true 862 c.ACLDefaultPolicy = "deny" 863 c.ACLTokenReplication = true 864 c.ACLReplicationRate = 100 865 c.ACLReplicationBurst = 100 866 c.ACLReplicationApplyLimit = 1000000 867 }) 868 s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) 869 testrpc.WaitForLeader(t, s2.RPC, "dc2") 870 defer os.RemoveAll(dir2) 871 defer s2.Shutdown() 872 codec2 := rpcClient(t, s2) 873 defer codec2.Close() 874 875 // Try to join. 876 joinWAN(t, s2, s1) 877 testrpc.WaitForLeader(t, s1.RPC, "dc1") 878 testrpc.WaitForLeader(t, s1.RPC, "dc2") 879 880 // Wait for legacy acls to be disabled so we are clear that 881 // legacy replication isn't meddling. 882 waitForNewACLs(t, s1) 883 waitForNewACLs(t, s2) 884 waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) 885 886 // create simple kv policy 887 kvPolicy, err := upsertTestPolicyWithRules(codec, "root", "dc1", ` 888 key_prefix "" { policy = "write" } 889 `) 890 require.NoError(t, err) 891 892 // Wait for it to replicate 893 retry.Run(t, func(r *retry.R) { 894 _, p, err := s2.fsm.State().ACLPolicyGetByID(nil, kvPolicy.ID, &structs.EnterpriseMeta{}) 895 require.Nil(r, err) 896 require.NotNil(r, p) 897 }) 898 899 // create local token that only works in DC2 900 localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) { 901 token.Local = true 902 token.Policies = []structs.ACLTokenPolicyLink{ 903 {ID: kvPolicy.ID}, 904 } 905 }) 906 require.NoError(t, err) 907 908 // Try to use it locally (it should work) 909 arg := structs.KVSRequest{ 910 Datacenter: "dc2", 911 Op: api.KVSet, 912 DirEnt: structs.DirEntry{ 913 Key: "foo", 914 Value: []byte("bar"), 915 }, 916 WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, 917 } 918 var out bool 919 err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) 920 require.NoError(t, err) 921 require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") 922 923 // Try to use it remotely 924 arg = structs.KVSRequest{ 925 Datacenter: "dc1", 926 Op: api.KVSet, 927 DirEnt: structs.DirEntry{ 928 Key: "foo", 929 Value: []byte("bar"), 930 }, 931 WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, 932 } 933 err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) 934 if !acl.IsErrPermissionDenied(err) { 935 t.Fatalf("err: %v", err) 936 } 937 938 // Update the anon token to also be able to write to kv 939 { 940 tokenUpsertReq := structs.ACLTokenSetRequest{ 941 Datacenter: "dc1", 942 ACLToken: structs.ACLToken{ 943 AccessorID: structs.ACLTokenAnonymousID, 944 Policies: []structs.ACLTokenPolicyLink{ 945 { 946 ID: kvPolicy.ID, 947 }, 948 }, 949 }, 950 WriteRequest: structs.WriteRequest{Token: "root"}, 951 } 952 token := structs.ACLToken{} 953 err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token) 954 require.NoError(t, err) 955 require.NotEmpty(t, token.SecretID) 956 } 957 958 // Try to use it remotely again, but this time it should fallback to anon 959 arg = structs.KVSRequest{ 960 Datacenter: "dc1", 961 Op: api.KVSet, 962 DirEnt: structs.DirEntry{ 963 Key: "foo", 964 Value: []byte("bar"), 965 }, 966 WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, 967 } 968 err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) 969 require.NoError(t, err) 970 require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") 971} 972 973func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) { 974 if testing.Short() { 975 t.Skip("too slow for testing.Short") 976 } 977 978 t.Parallel() 979 dir1, s1 := testServerWithConfig(t, func(c *Config) { 980 c.PrimaryDatacenter = "dc1" 981 c.ACLsEnabled = true 982 c.ACLDefaultPolicy = "deny" 983 c.ACLMasterToken = "root" 984 c.RPCConfig.EnableStreaming = true 985 }) 986 s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig) 987 defer os.RemoveAll(dir1) 988 defer s1.Shutdown() 989 testrpc.WaitForLeader(t, s1.RPC, "dc1") 990 codec := rpcClient(t, s1) 991 defer codec.Close() 992 993 dir2, s2 := testServerWithConfig(t, func(c *Config) { 994 c.Datacenter = "dc2" 995 c.PrimaryDatacenter = "dc1" 996 c.ACLsEnabled = true 997 c.ACLDefaultPolicy = "deny" 998 c.ACLTokenReplication = true 999 c.ACLReplicationRate = 100 1000 c.ACLReplicationBurst = 100 1001 c.ACLReplicationApplyLimit = 1000000 1002 c.RPCConfig.EnableStreaming = true 1003 }) 1004 s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) 1005 s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig) 1006 testrpc.WaitForLeader(t, s2.RPC, "dc2") 1007 defer os.RemoveAll(dir2) 1008 defer s2.Shutdown() 1009 codec2 := rpcClient(t, s2) 1010 defer codec2.Close() 1011 1012 // Try to join. 1013 joinWAN(t, s2, s1) 1014 testrpc.WaitForLeader(t, s1.RPC, "dc1") 1015 testrpc.WaitForLeader(t, s1.RPC, "dc2") 1016 1017 // Wait for legacy acls to be disabled so we are clear that 1018 // legacy replication isn't meddling. 1019 waitForNewACLs(t, s1) 1020 waitForNewACLs(t, s2) 1021 waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) 1022 1023 // create simple service policy 1024 policy, err := upsertTestPolicyWithRules(codec, "root", "dc1", ` 1025 node_prefix "" { policy = "read" } 1026 service_prefix "" { policy = "read" } 1027 `) 1028 require.NoError(t, err) 1029 1030 // Wait for it to replicate 1031 retry.Run(t, func(r *retry.R) { 1032 _, p, err := s2.fsm.State().ACLPolicyGetByID(nil, policy.ID, &structs.EnterpriseMeta{}) 1033 require.Nil(r, err) 1034 require.NotNil(r, p) 1035 }) 1036 1037 // create local token that only works in DC2 1038 localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) { 1039 token.Local = true 1040 token.Policies = []structs.ACLTokenPolicyLink{ 1041 {ID: policy.ID}, 1042 } 1043 }) 1044 require.NoError(t, err) 1045 1046 runStep(t, "Register a dummy node with a service", func(t *testing.T) { 1047 req := &structs.RegisterRequest{ 1048 Node: "node1", 1049 Address: "3.4.5.6", 1050 Datacenter: "dc1", 1051 Service: &structs.NodeService{ 1052 ID: "redis1", 1053 Service: "redis", 1054 Address: "3.4.5.6", 1055 Port: 8080, 1056 }, 1057 WriteRequest: structs.WriteRequest{Token: "root"}, 1058 } 1059 var out struct{} 1060 require.NoError(t, s1.RPC("Catalog.Register", &req, &out)) 1061 }) 1062 1063 var conn *grpc.ClientConn 1064 { 1065 client, builder := newClientWithGRPCResolver(t, func(c *Config) { 1066 c.Datacenter = "dc2" 1067 c.PrimaryDatacenter = "dc1" 1068 c.RPCConfig.EnableStreaming = true 1069 }) 1070 joinLAN(t, client, s2) 1071 testrpc.WaitForTestAgent(t, client.RPC, "dc2", testrpc.WithToken("root")) 1072 1073 pool := agent_grpc.NewClientConnPool(agent_grpc.ClientConnPoolConfig{ 1074 Servers: builder, 1075 DialingFromServer: false, 1076 DialingFromDatacenter: "dc2", 1077 }) 1078 1079 conn, err = pool.ClientConn("dc2") 1080 require.NoError(t, err) 1081 } 1082 1083 // Try to use it locally (it should work) 1084 runStep(t, "token used locally should work", func(t *testing.T) { 1085 arg := &pbsubscribe.SubscribeRequest{ 1086 Topic: pbsubscribe.Topic_ServiceHealth, 1087 Key: "redis", 1088 Token: localToken2.SecretID, 1089 Datacenter: "dc2", 1090 } 1091 event, err := getFirstSubscribeEventOrError(conn, arg) 1092 require.NoError(t, err) 1093 require.NotNil(t, event) 1094 1095 // make sure that token restore defer works 1096 require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped") 1097 }) 1098 1099 runStep(t, "token used remotely should not work", func(t *testing.T) { 1100 arg := &pbsubscribe.SubscribeRequest{ 1101 Topic: pbsubscribe.Topic_ServiceHealth, 1102 Key: "redis", 1103 Token: localToken2.SecretID, 1104 Datacenter: "dc1", 1105 } 1106 1107 event, err := getFirstSubscribeEventOrError(conn, arg) 1108 1109 // NOTE: the subscription endpoint is a filtering style instead of a 1110 // hard-fail style so when the token isn't present 100% of the data is 1111 // filtered out leading to a stream with an empty snapshot. 1112 require.NoError(t, err) 1113 require.IsType(t, &pbsubscribe.Event_EndOfSnapshot{}, event.Payload) 1114 require.True(t, event.Payload.(*pbsubscribe.Event_EndOfSnapshot).EndOfSnapshot) 1115 }) 1116 1117 runStep(t, "update anonymous token to read services", func(t *testing.T) { 1118 tokenUpsertReq := structs.ACLTokenSetRequest{ 1119 Datacenter: "dc1", 1120 ACLToken: structs.ACLToken{ 1121 AccessorID: structs.ACLTokenAnonymousID, 1122 Policies: []structs.ACLTokenPolicyLink{ 1123 {ID: policy.ID}, 1124 }, 1125 }, 1126 WriteRequest: structs.WriteRequest{Token: "root"}, 1127 } 1128 token := structs.ACLToken{} 1129 err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token) 1130 require.NoError(t, err) 1131 require.NotEmpty(t, token.SecretID) 1132 }) 1133 1134 runStep(t, "token used remotely should fallback on anonymous token now", func(t *testing.T) { 1135 arg := &pbsubscribe.SubscribeRequest{ 1136 Topic: pbsubscribe.Topic_ServiceHealth, 1137 Key: "redis", 1138 Token: localToken2.SecretID, 1139 Datacenter: "dc1", 1140 } 1141 1142 event, err := getFirstSubscribeEventOrError(conn, arg) 1143 require.NoError(t, err) 1144 require.NotNil(t, event) 1145 1146 // So now that we can read data, we should get a snapshot with just instances of the "consul" service. 1147 require.NoError(t, err) 1148 1149 require.IsType(t, &pbsubscribe.Event_ServiceHealth{}, event.Payload) 1150 esh := event.Payload.(*pbsubscribe.Event_ServiceHealth) 1151 1152 require.Equal(t, pbsubscribe.CatalogOp_Register, esh.ServiceHealth.Op) 1153 csn := esh.ServiceHealth.CheckServiceNode 1154 1155 require.NotNil(t, csn) 1156 require.NotNil(t, csn.Node) 1157 require.Equal(t, "node1", csn.Node.Node) 1158 require.Equal(t, "3.4.5.6", csn.Node.Address) 1159 require.NotNil(t, csn.Service) 1160 require.Equal(t, "redis1", csn.Service.ID) 1161 require.Equal(t, "redis", csn.Service.Service) 1162 1163 // make sure that token restore defer works 1164 require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped") 1165 }) 1166} 1167 1168func TestCanRetry(t *testing.T) { 1169 type testCase struct { 1170 name string 1171 req structs.RPCInfo 1172 err error 1173 expected bool 1174 } 1175 1176 run := func(t *testing.T, tc testCase) { 1177 require.Equal(t, tc.expected, canRetry(tc.req, tc.err)) 1178 } 1179 1180 var testCases = []testCase{ 1181 { 1182 name: "unexpected error", 1183 err: fmt.Errorf("some arbitrary error"), 1184 expected: false, 1185 }, 1186 { 1187 name: "checking error", 1188 err: fmt.Errorf("some wrapping :%w", ErrChunkingResubmit), 1189 expected: true, 1190 }, 1191 { 1192 name: "no leader error", 1193 err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader), 1194 expected: true, 1195 }, 1196 { 1197 name: "EOF on read request", 1198 req: isReadRequest{}, 1199 err: io.EOF, 1200 expected: true, 1201 }, 1202 { 1203 name: "EOF on write request", 1204 err: io.EOF, 1205 expected: false, 1206 }, 1207 } 1208 1209 for _, tc := range testCases { 1210 t.Run(tc.name, func(t *testing.T) { 1211 run(t, tc) 1212 }) 1213 } 1214} 1215 1216type isReadRequest struct { 1217 structs.RPCInfo 1218} 1219 1220func (r isReadRequest) IsRead() bool { 1221 return true 1222} 1223 1224func TestRPC_AuthorizeRaftRPC(t *testing.T) { 1225 caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"}) 1226 require.NoError(t, err) 1227 1228 caSigner, err := tlsutil.ParseSigner(caPK) 1229 require.NoError(t, err) 1230 1231 dir := testutil.TempDir(t, "certs") 1232 err = ioutil.WriteFile(filepath.Join(dir, "ca.pem"), []byte(caPEM), 0600) 1233 require.NoError(t, err) 1234 1235 intermediatePEM, intermediatePK, err := tlsutil.GenerateCert(tlsutil.CertOpts{IsCA: true, CA: caPEM, Signer: caSigner, Days: 5}) 1236 require.NoError(t, err) 1237 1238 err = ioutil.WriteFile(filepath.Join(dir, "intermediate.pem"), []byte(intermediatePEM), 0600) 1239 require.NoError(t, err) 1240 1241 newCert := func(t *testing.T, caPEM, pk, node, name string) { 1242 t.Helper() 1243 1244 signer, err := tlsutil.ParseSigner(pk) 1245 require.NoError(t, err) 1246 1247 pem, key, err := tlsutil.GenerateCert(tlsutil.CertOpts{ 1248 Signer: signer, 1249 CA: caPEM, 1250 Name: name, 1251 Days: 5, 1252 DNSNames: []string{node + "." + name, name, "localhost"}, 1253 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, 1254 }) 1255 require.NoError(t, err) 1256 1257 err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".pem"), []byte(pem), 0600) 1258 require.NoError(t, err) 1259 err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".key"), []byte(key), 0600) 1260 require.NoError(t, err) 1261 } 1262 1263 newCert(t, caPEM, caPK, "srv1", "server.dc1.consul") 1264 1265 _, connectCApk, err := connect.GeneratePrivateKey() 1266 require.NoError(t, err) 1267 1268 _, srv := testServerWithConfig(t, func(c *Config) { 1269 c.Domain = "consul." // consul. is the default value in agent/config 1270 c.CAFile = filepath.Join(dir, "ca.pem") 1271 c.CertFile = filepath.Join(dir, "srv1-server.dc1.consul.pem") 1272 c.KeyFile = filepath.Join(dir, "srv1-server.dc1.consul.key") 1273 c.VerifyIncoming = true 1274 c.VerifyServerHostname = true 1275 // Enable Auto-Encrypt so that Conenct CA roots are added to the 1276 // tlsutil.Configurator. 1277 c.AutoEncryptAllowTLS = true 1278 c.CAConfig = &structs.CAConfiguration{ 1279 ClusterID: connect.TestClusterID, 1280 Provider: structs.ConsulCAProvider, 1281 Config: map[string]interface{}{"PrivateKey": connectCApk}, 1282 } 1283 }) 1284 defer srv.Shutdown() 1285 1286 // Wait for ConnectCA initiation to complete. 1287 retry.Run(t, func(r *retry.R) { 1288 _, root := srv.caManager.getCAProvider() 1289 if root == nil { 1290 r.Fatal("ConnectCA root is still nil") 1291 } 1292 }) 1293 1294 useTLSByte := func(t *testing.T, c *tlsutil.Configurator) net.Conn { 1295 wrapper := tlsutil.SpecificDC("dc1", c.OutgoingRPCWrapper()) 1296 tlsEnabled := func(_ raft.ServerAddress) bool { 1297 return true 1298 } 1299 1300 rl := NewRaftLayer(nil, nil, wrapper, tlsEnabled) 1301 conn, err := rl.Dial(raft.ServerAddress(srv.Listener.Addr().String()), 100*time.Millisecond) 1302 require.NoError(t, err) 1303 return conn 1304 } 1305 1306 useNativeTLS := func(t *testing.T, c *tlsutil.Configurator) net.Conn { 1307 wrapper := c.OutgoingALPNRPCWrapper() 1308 dialer := &net.Dialer{Timeout: 100 * time.Millisecond} 1309 1310 rawConn, err := dialer.Dial("tcp", srv.Listener.Addr().String()) 1311 require.NoError(t, err) 1312 1313 tlsConn, err := wrapper("dc1", "srv1", pool.ALPN_RPCRaft, rawConn) 1314 require.NoError(t, err) 1315 return tlsConn 1316 } 1317 1318 setupAgentTLSCert := func(name string) func(t *testing.T) string { 1319 return func(t *testing.T) string { 1320 newCert(t, caPEM, caPK, "node1", name) 1321 return filepath.Join(dir, "node1-"+name) 1322 } 1323 } 1324 1325 setupAgentTLSCertWithIntermediate := func(name string) func(t *testing.T) string { 1326 return func(t *testing.T) string { 1327 newCert(t, intermediatePEM, intermediatePK, "node1", name) 1328 certPrefix := filepath.Join(dir, "node1-"+name) 1329 f, err := os.OpenFile(certPrefix+".pem", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) 1330 if err != nil { 1331 t.Fatal(err) 1332 } 1333 if _, err := f.Write([]byte(intermediatePEM)); err != nil { 1334 t.Fatal(err) 1335 } 1336 if err := f.Close(); err != nil { 1337 t.Fatal(err) 1338 } 1339 return certPrefix 1340 } 1341 } 1342 1343 setupConnectCACert := func(name string) func(t *testing.T) string { 1344 return func(t *testing.T) string { 1345 _, caRoot := srv.caManager.getCAProvider() 1346 newCert(t, caRoot.RootCert, connectCApk, "node1", name) 1347 return filepath.Join(dir, "node1-"+name) 1348 } 1349 } 1350 1351 type testCase struct { 1352 name string 1353 conn func(t *testing.T, c *tlsutil.Configurator) net.Conn 1354 setupCert func(t *testing.T) string 1355 expectError bool 1356 } 1357 1358 run := func(t *testing.T, tc testCase) { 1359 certPath := tc.setupCert(t) 1360 1361 cfg := tlsutil.Config{ 1362 VerifyOutgoing: true, 1363 VerifyServerHostname: true, 1364 CAFile: filepath.Join(dir, "ca.pem"), 1365 CertFile: certPath + ".pem", 1366 KeyFile: certPath + ".key", 1367 Domain: "consul", 1368 } 1369 c, err := tlsutil.NewConfigurator(cfg, hclog.New(nil)) 1370 require.NoError(t, err) 1371 1372 _, err = doRaftRPC(tc.conn(t, c), srv.config.NodeName) 1373 if tc.expectError { 1374 if !isConnectionClosedError(err) { 1375 t.Fatalf("expected a connection closed error, got: %v", err) 1376 } 1377 return 1378 } 1379 require.NoError(t, err) 1380 } 1381 1382 var testCases = []testCase{ 1383 { 1384 name: "TLS byte with client cert", 1385 setupCert: setupAgentTLSCert("client.dc1.consul"), 1386 conn: useTLSByte, 1387 expectError: true, 1388 }, 1389 { 1390 name: "TLS byte with server cert in different DC", 1391 setupCert: setupAgentTLSCert("server.dc2.consul"), 1392 conn: useTLSByte, 1393 expectError: true, 1394 }, 1395 { 1396 name: "TLS byte with server cert in same DC", 1397 setupCert: setupAgentTLSCert("server.dc1.consul"), 1398 conn: useTLSByte, 1399 }, 1400 { 1401 name: "TLS byte with server cert in same DC and with unknown intermediate", 1402 setupCert: setupAgentTLSCertWithIntermediate("server.dc1.consul"), 1403 conn: useTLSByte, 1404 }, 1405 { 1406 name: "TLS byte with ConnectCA leaf cert", 1407 setupCert: setupConnectCACert("server.dc1.consul"), 1408 conn: useTLSByte, 1409 expectError: true, 1410 }, 1411 { 1412 name: "native TLS with client cert", 1413 setupCert: setupAgentTLSCert("client.dc1.consul"), 1414 conn: useNativeTLS, 1415 expectError: true, 1416 }, 1417 { 1418 name: "native TLS with server cert in different DC", 1419 setupCert: setupAgentTLSCert("server.dc2.consul"), 1420 conn: useNativeTLS, 1421 expectError: true, 1422 }, 1423 { 1424 name: "native TLS with server cert in same DC", 1425 setupCert: setupAgentTLSCert("server.dc1.consul"), 1426 conn: useNativeTLS, 1427 }, 1428 { 1429 name: "native TLS with ConnectCA leaf cert", 1430 setupCert: setupConnectCACert("server.dc1.consul"), 1431 conn: useNativeTLS, 1432 expectError: true, 1433 }, 1434 } 1435 1436 for _, tc := range testCases { 1437 t.Run(tc.name, func(t *testing.T) { 1438 run(t, tc) 1439 }) 1440 } 1441} 1442 1443func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) { 1444 var resp raft.AppendEntriesResponse 1445 1446 var term uint64 = 0xc 1447 a := raft.AppendEntriesRequest{ 1448 RPCHeader: raft.RPCHeader{ProtocolVersion: 3}, 1449 Term: 0, 1450 Leader: []byte(leader), 1451 PrevLogEntry: 0, 1452 PrevLogTerm: term, 1453 LeaderCommitIndex: 50, 1454 } 1455 1456 if err := appendEntries(conn, a, &resp); err != nil { 1457 return resp, err 1458 } 1459 return resp, nil 1460} 1461 1462func appendEntries(conn net.Conn, req raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { 1463 w := bufio.NewWriter(conn) 1464 enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) 1465 1466 const rpcAppendEntries = 0 1467 if err := w.WriteByte(rpcAppendEntries); err != nil { 1468 return fmt.Errorf("failed to write raft-RPC byte: %w", err) 1469 } 1470 1471 if err := enc.Encode(req); err != nil { 1472 return fmt.Errorf("failed to send append entries RPC: %w", err) 1473 } 1474 if err := w.Flush(); err != nil { 1475 return fmt.Errorf("failed to flush RPC: %w", err) 1476 } 1477 1478 if err := decodeRaftRPCResponse(conn, resp); err != nil { 1479 return fmt.Errorf("response error: %w", err) 1480 } 1481 return nil 1482} 1483 1484// copied and modified from raft/net_transport.go 1485func decodeRaftRPCResponse(conn net.Conn, resp *raft.AppendEntriesResponse) error { 1486 r := bufio.NewReader(conn) 1487 dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) 1488 1489 var rpcError string 1490 if err := dec.Decode(&rpcError); err != nil { 1491 return fmt.Errorf("failed to decode response error: %w", err) 1492 } 1493 if err := dec.Decode(resp); err != nil { 1494 return fmt.Errorf("failed to decode response: %w", err) 1495 } 1496 if rpcError != "" { 1497 return fmt.Errorf("rpc error: %v", rpcError) 1498 } 1499 return nil 1500} 1501 1502func isConnectionClosedError(err error) bool { 1503 switch { 1504 case err == nil: 1505 return false 1506 case errors.Is(err, io.EOF): 1507 return true 1508 case strings.Contains(err.Error(), "connection reset by peer"): 1509 return true 1510 default: 1511 return false 1512 } 1513} 1514 1515func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.SubscribeRequest) (*pbsubscribe.Event, error) { 1516 streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) 1517 ctx, cancel := context.WithCancel(context.Background()) 1518 defer cancel() 1519 1520 handle, err := streamClient.Subscribe(ctx, req) 1521 if err != nil { 1522 return nil, err 1523 } 1524 1525 event, err := handle.Recv() 1526 if err == io.EOF { 1527 return nil, nil 1528 } 1529 if err != nil { 1530 return nil, err 1531 } 1532 return event, nil 1533} 1534