1package consul
2
3import (
4	"bufio"
5	"bytes"
6	"context"
7	"crypto/x509"
8	"encoding/binary"
9	"errors"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"math"
14	"net"
15	"os"
16	"path/filepath"
17	"strings"
18	"sync"
19	"testing"
20	"time"
21
22	"github.com/hashicorp/go-hclog"
23	"github.com/hashicorp/go-memdb"
24	"github.com/hashicorp/go-msgpack/codec"
25	msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
26	"github.com/hashicorp/raft"
27	"github.com/stretchr/testify/assert"
28	"github.com/stretchr/testify/require"
29	"google.golang.org/grpc"
30
31	"github.com/hashicorp/consul/acl"
32	"github.com/hashicorp/consul/agent/connect"
33	"github.com/hashicorp/consul/agent/consul/state"
34	agent_grpc "github.com/hashicorp/consul/agent/grpc"
35	"github.com/hashicorp/consul/agent/pool"
36	"github.com/hashicorp/consul/agent/structs"
37	tokenStore "github.com/hashicorp/consul/agent/token"
38	"github.com/hashicorp/consul/api"
39	"github.com/hashicorp/consul/proto/pbsubscribe"
40	"github.com/hashicorp/consul/sdk/testutil"
41	"github.com/hashicorp/consul/sdk/testutil/retry"
42	"github.com/hashicorp/consul/testrpc"
43	"github.com/hashicorp/consul/tlsutil"
44)
45
46func TestRPC_NoLeader_Fail(t *testing.T) {
47	if testing.Short() {
48		t.Skip("too slow for testing.Short")
49	}
50
51	t.Parallel()
52	dir1, s1 := testServerWithConfig(t, func(c *Config) {
53		c.RPCHoldTimeout = 1 * time.Millisecond
54	})
55	defer os.RemoveAll(dir1)
56	defer s1.Shutdown()
57	codec := rpcClient(t, s1)
58	defer codec.Close()
59
60	arg := structs.RegisterRequest{
61		Datacenter: "dc1",
62		Node:       "foo",
63		Address:    "127.0.0.1",
64	}
65	var out struct{}
66
67	// Make sure we eventually fail with a no leader error, which we should
68	// see given the short timeout.
69	err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
70	if err == nil || err.Error() != structs.ErrNoLeader.Error() {
71		t.Fatalf("bad: %v", err)
72	}
73
74	// Now make sure it goes through.
75	testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
76	err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
77	if err != nil {
78		t.Fatalf("bad: %v", err)
79	}
80}
81
82func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) {
83	if testing.Short() {
84		t.Skip("too slow for testing.Short")
85	}
86
87	t.Parallel()
88	dir1, s1 := testServerWithConfig(t, func(c *Config) {
89		c.RPCHoldTimeout = 1 * time.Millisecond
90	})
91	defer os.RemoveAll(dir1)
92	defer s1.Shutdown()
93	codec := rpcClient(t, s1)
94	defer codec.Close()
95
96	arg := structs.RegisterRequest{
97		Datacenter: "dc1",
98		Node:       "foo",
99		Address:    "127.0.0.1",
100	}
101	var out struct{}
102
103	// Make sure we eventually fail with a no leader error, which we should
104	// see given the short timeout.
105	err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
106	if err == nil || err.Error() != structs.ErrNoLeader.Error() {
107		t.Fatalf("bad: %v", err)
108	}
109
110	// Until leader has never been known, stale should fail
111	getKeysReq := structs.KeyListRequest{
112		Datacenter:   "dc1",
113		Prefix:       "",
114		Seperator:    "/",
115		QueryOptions: structs.QueryOptions{AllowStale: true},
116	}
117	var keyList structs.IndexedKeyList
118	if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() {
119		t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err)
120	}
121
122	testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
123	if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil {
124		t.Fatalf("Did not expect any error but got err: %v", err)
125	}
126}
127
128func TestRPC_NoLeader_Retry(t *testing.T) {
129	if testing.Short() {
130		t.Skip("too slow for testing.Short")
131	}
132
133	t.Parallel()
134	dir1, s1 := testServerWithConfig(t, func(c *Config) {
135		c.RPCHoldTimeout = 10 * time.Second
136	})
137	defer os.RemoveAll(dir1)
138	defer s1.Shutdown()
139	codec := rpcClient(t, s1)
140	defer codec.Close()
141
142	arg := structs.RegisterRequest{
143		Datacenter: "dc1",
144		Node:       "foo",
145		Address:    "127.0.0.1",
146	}
147	var out struct{}
148
149	// This isn't sure-fire but tries to check that we don't have a
150	// leader going into the RPC, so we exercise the retry logic.
151	if ok, _, _ := s1.getLeader(); ok {
152		t.Fatalf("should not have a leader yet")
153	}
154
155	// The timeout is long enough to ride out any reasonable leader
156	// election.
157	err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
158	if err != nil {
159		t.Fatalf("bad: %v", err)
160	}
161}
162
163func TestRPC_getLeader_ErrLeaderNotTracked(t *testing.T) {
164	if testing.Short() {
165		t.Skip("too slow for testing.Short")
166	}
167
168	cluster := newTestCluster(t, &testClusterConfig{
169		Datacenter: "dc1",
170		Servers:    3,
171		ServerWait: func(t *testing.T, srv *Server) {
172			// The test cluster waits for a leader to be established
173			// but not for all the RPC tracking of all servers to be updated
174			// so we also want to wait for that here
175			retry.Run(t, func(r *retry.R) {
176				if !srv.IsLeader() {
177					_, _, err := srv.getLeader()
178					require.NoError(r, err)
179				}
180			})
181
182		},
183	})
184
185	// At this point we know we have a cluster with a leader and all followers are tracking that
186	// leader in the serverLookup struct. We need to find a follower to hack its server lookup
187	// to force the error we desire
188
189	var follower *Server
190	for _, srv := range cluster.Servers {
191		if !srv.IsLeader() {
192			follower = srv
193			break
194		}
195	}
196
197	_, leaderMeta, err := follower.getLeader()
198	require.NoError(t, err)
199
200	// now do some behind the scenes trickery on the followers server lookup
201	// to remove the leader from it so that we can force a ErrLeaderNotTracked error
202	follower.serverLookup.RemoveServer(leaderMeta)
203
204	isLeader, meta, err := follower.getLeader()
205	require.Error(t, err)
206	require.True(t, errors.Is(err, structs.ErrLeaderNotTracked))
207	require.Nil(t, meta)
208	require.False(t, isLeader)
209}
210
211type MockSink struct {
212	*bytes.Buffer
213	cancel bool
214}
215
216func (m *MockSink) ID() string {
217	return "Mock"
218}
219
220func (m *MockSink) Cancel() error {
221	m.cancel = true
222	return nil
223}
224
225func (m *MockSink) Close() error {
226	return nil
227}
228
229func TestRPC_blockingQuery(t *testing.T) {
230	t.Parallel()
231	dir, s := testServer(t)
232	defer os.RemoveAll(dir)
233	defer s.Shutdown()
234
235	require := require.New(t)
236	assert := assert.New(t)
237
238	// Perform a non-blocking query. Note that it's significant that the meta has
239	// a zero index in response - the implied opts.MinQueryIndex is also zero but
240	// this should not block still.
241	{
242		var opts structs.QueryOptions
243		var meta structs.QueryMeta
244		var calls int
245		fn := func(_ memdb.WatchSet, _ *state.Store) error {
246			calls++
247			return nil
248		}
249		if err := s.blockingQuery(&opts, &meta, fn); err != nil {
250			t.Fatalf("err: %v", err)
251		}
252		if calls != 1 {
253			t.Fatalf("bad: %d", calls)
254		}
255	}
256
257	// Perform a blocking query that gets woken up and loops around once.
258	{
259		opts := structs.QueryOptions{
260			MinQueryIndex: 3,
261		}
262		var meta structs.QueryMeta
263		var calls int
264		fn := func(ws memdb.WatchSet, _ *state.Store) error {
265			if calls == 0 {
266				meta.Index = 3
267
268				fakeCh := make(chan struct{})
269				close(fakeCh)
270				ws.Add(fakeCh)
271			} else {
272				meta.Index = 4
273			}
274			calls++
275			return nil
276		}
277		if err := s.blockingQuery(&opts, &meta, fn); err != nil {
278			t.Fatalf("err: %v", err)
279		}
280		if calls != 2 {
281			t.Fatalf("bad: %d", calls)
282		}
283	}
284
285	// Perform a blocking query that returns a zero index from blocking func (e.g.
286	// no state yet). This should still return an empty response immediately, but
287	// with index of 1 and then block on the next attempt. In one sense zero index
288	// is not really a valid response from a state method that is not an error but
289	// in practice a lot of state store operations do return it unless they
290	// explicitly special checks to turn 0 into 1. Often this is not caught or
291	// covered by tests but eventually when hit in the wild causes blocking
292	// clients to busy loop and burn CPU. This test ensure that blockingQuery
293	// systematically does the right thing to prevent future bugs like that.
294	{
295		opts := structs.QueryOptions{
296			MinQueryIndex: 0,
297		}
298		var meta structs.QueryMeta
299		var calls int
300		fn := func(ws memdb.WatchSet, _ *state.Store) error {
301			if opts.MinQueryIndex > 0 {
302				// If client requested blocking, block forever. This is simulating
303				// waiting for the watched resource to be initialized/written to giving
304				// it a non-zero index. Note the timeout on the query options is relied
305				// on to stop the test taking forever.
306				fakeCh := make(chan struct{})
307				ws.Add(fakeCh)
308			}
309			meta.Index = 0
310			calls++
311			return nil
312		}
313		require.NoError(s.blockingQuery(&opts, &meta, fn))
314		assert.Equal(1, calls)
315		assert.Equal(uint64(1), meta.Index,
316			"expect fake index of 1 to force client to block on next update")
317
318		// Simulate client making next request
319		opts.MinQueryIndex = 1
320		opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long
321
322		// This time we should block even though the func returns index 0 still
323		t0 := time.Now()
324		require.NoError(s.blockingQuery(&opts, &meta, fn))
325		t1 := time.Now()
326		assert.Equal(2, calls)
327		assert.Equal(uint64(1), meta.Index,
328			"expect fake index of 1 to force client to block on next update")
329		assert.True(t1.Sub(t0) > 20*time.Millisecond,
330			"should have actually blocked waiting for timeout")
331
332	}
333
334	// Perform a query that blocks and gets interrupted when the state store
335	// is abandoned.
336	{
337		opts := structs.QueryOptions{
338			MinQueryIndex: 3,
339		}
340		var meta structs.QueryMeta
341		var calls int
342		fn := func(_ memdb.WatchSet, _ *state.Store) error {
343			if calls == 0 {
344				meta.Index = 3
345
346				snap, err := s.fsm.Snapshot()
347				if err != nil {
348					t.Fatalf("err: %v", err)
349				}
350				defer snap.Release()
351
352				buf := bytes.NewBuffer(nil)
353				sink := &MockSink{buf, false}
354				if err := snap.Persist(sink); err != nil {
355					t.Fatalf("err: %v", err)
356				}
357
358				if err := s.fsm.Restore(sink); err != nil {
359					t.Fatalf("err: %v", err)
360				}
361			}
362			calls++
363			return nil
364		}
365		if err := s.blockingQuery(&opts, &meta, fn); err != nil {
366			t.Fatalf("err: %v", err)
367		}
368		if calls != 1 {
369			t.Fatalf("bad: %d", calls)
370		}
371	}
372}
373
374func TestRPC_ReadyForConsistentReads(t *testing.T) {
375	if testing.Short() {
376		t.Skip("too slow for testing.Short")
377	}
378
379	t.Parallel()
380	dir, s := testServerWithConfig(t, func(c *Config) {
381		c.RPCHoldTimeout = 2 * time.Millisecond
382	})
383	defer os.RemoveAll(dir)
384	defer s.Shutdown()
385
386	testrpc.WaitForLeader(t, s.RPC, "dc1")
387
388	if !s.isReadyForConsistentReads() {
389		t.Fatal("Server should be ready for consistent reads")
390	}
391
392	s.resetConsistentReadReady()
393	err := s.consistentRead()
394	if err.Error() != "Not ready to serve consistent reads" {
395		t.Fatal("Server should NOT be ready for consistent reads")
396	}
397
398	go func() {
399		time.Sleep(100 * time.Millisecond)
400		s.setConsistentReadReady()
401	}()
402
403	retry.Run(t, func(r *retry.R) {
404		if err := s.consistentRead(); err != nil {
405			r.Fatalf("Expected server to be ready for consistent reads, got error %v", err)
406		}
407	})
408}
409
410func TestRPC_MagicByteTimeout(t *testing.T) {
411	if testing.Short() {
412		t.Skip("too slow for testing.Short")
413	}
414
415	t.Parallel()
416	dir1, s1 := testServerWithConfig(t, func(c *Config) {
417		c.RPCHandshakeTimeout = 10 * time.Millisecond
418	})
419	defer os.RemoveAll(dir1)
420	defer s1.Shutdown()
421
422	// Connect to the server with bare TCP to simulate a malicious client trying
423	// to hold open resources.
424	addr := s1.config.RPCAdvertise
425	conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
426	require.NoError(t, err)
427	defer conn.Close()
428
429	// Wait for more than the timeout. This is timing dependent so could fail if
430	// the CPU is super overloaded so the handler goroutine so I'm using a retry
431	// loop below to be sure but this feels like a pretty generous margin for
432	// error (10x the timeout and 100ms of scheduling time).
433	time.Sleep(100 * time.Millisecond)
434
435	// Set a read deadline on the Conn in case the timeout is not working we don't
436	// want the read below to block forever. Needs to be much longer than what we
437	// expect and the error should be different too.
438	conn.SetReadDeadline(time.Now().Add(3 * time.Second))
439
440	retry.Run(t, func(r *retry.R) {
441		// Sanity check the conn was closed by attempting to read from it (a write
442		// might not detect the close).
443		buf := make([]byte, 10)
444		_, err = conn.Read(buf)
445		require.Error(r, err)
446		require.Contains(r, err.Error(), "EOF")
447	})
448}
449
450func TestRPC_TLSHandshakeTimeout(t *testing.T) {
451	if testing.Short() {
452		t.Skip("too slow for testing.Short")
453	}
454
455	t.Parallel()
456
457	dir1, s1 := testServerWithConfig(t, func(c *Config) {
458		c.RPCHandshakeTimeout = 10 * time.Millisecond
459		c.UseTLS = true
460		c.CAFile = "../../test/hostname/CertAuth.crt"
461		c.CertFile = "../../test/hostname/Alice.crt"
462		c.KeyFile = "../../test/hostname/Alice.key"
463		c.VerifyServerHostname = true
464		c.VerifyOutgoing = true
465		c.VerifyIncoming = true
466	})
467	defer os.RemoveAll(dir1)
468	defer s1.Shutdown()
469
470	// Connect to the server with TLS magic byte delivered on time
471	addr := s1.config.RPCAdvertise
472	conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
473	require.NoError(t, err)
474	defer conn.Close()
475
476	// Write TLS byte to avoid being closed by either the (outer) first byte
477	// timeout or the fact that server requires TLS
478	_, err = conn.Write([]byte{byte(pool.RPCTLS)})
479	require.NoError(t, err)
480
481	// Wait for more than the timeout before we start a TLS handshake. This is
482	// timing dependent so could fail if the CPU is super overloaded so the
483	// handler goroutine so I'm using a retry loop below to be sure but this feels
484	// like a pretty generous margin for error (10x the timeout and 100ms of
485	// scheduling time).
486	time.Sleep(100 * time.Millisecond)
487
488	// Set a read deadline on the Conn in case the timeout is not working we don't
489	// want the read below to block forever. Needs to be much longer than what we
490	// expect and the error should be different too.
491	conn.SetReadDeadline(time.Now().Add(3 * time.Second))
492
493	retry.Run(t, func(r *retry.R) {
494		// Sanity check the conn was closed by attempting to read from it (a write
495		// might not detect the close).
496		buf := make([]byte, 10)
497		_, err = conn.Read(buf)
498		require.Error(r, err)
499		require.Contains(r, err.Error(), "EOF")
500	})
501}
502
503func TestRPC_PreventsTLSNesting(t *testing.T) {
504	if testing.Short() {
505		t.Skip("too slow for testing.Short")
506	}
507
508	t.Parallel()
509
510	cases := []struct {
511		name      string
512		outerByte pool.RPCType
513		innerByte pool.RPCType
514		wantClose bool
515	}{
516		{
517			// Base case, sanity check normal RPC in TLS works
518			name:      "RPC in TLS",
519			outerByte: pool.RPCTLS,
520			innerByte: pool.RPCConsul,
521			wantClose: false,
522		},
523		{
524			// Nested TLS-in-TLS
525			name:      "TLS in TLS",
526			outerByte: pool.RPCTLS,
527			innerByte: pool.RPCTLS,
528			wantClose: true,
529		},
530		{
531			// Nested TLS-in-TLS
532			name:      "TLS in Insecure TLS",
533			outerByte: pool.RPCTLSInsecure,
534			innerByte: pool.RPCTLS,
535			wantClose: true,
536		},
537		{
538			// Nested TLS-in-TLS
539			name:      "Insecure TLS in TLS",
540			outerByte: pool.RPCTLS,
541			innerByte: pool.RPCTLSInsecure,
542			wantClose: true,
543		},
544		{
545			// Nested TLS-in-TLS
546			name:      "Insecure TLS in Insecure TLS",
547			outerByte: pool.RPCTLSInsecure,
548			innerByte: pool.RPCTLSInsecure,
549			wantClose: true,
550		},
551	}
552
553	for _, tc := range cases {
554		t.Run(tc.name, func(t *testing.T) {
555			dir1, s1 := testServerWithConfig(t, func(c *Config) {
556				c.UseTLS = true
557				c.CAFile = "../../test/hostname/CertAuth.crt"
558				c.CertFile = "../../test/hostname/Alice.crt"
559				c.KeyFile = "../../test/hostname/Alice.key"
560				c.VerifyServerHostname = true
561				c.VerifyOutgoing = true
562				c.VerifyIncoming = false // saves us getting client cert setup
563				c.Domain = "consul"
564			})
565			defer os.RemoveAll(dir1)
566			defer s1.Shutdown()
567
568			// Connect to the server with TLS magic byte delivered on time
569			addr := s1.config.RPCAdvertise
570			conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
571			require.NoError(t, err)
572			defer conn.Close()
573
574			// Write Outer magic byte
575			_, err = conn.Write([]byte{byte(tc.outerByte)})
576			require.NoError(t, err)
577
578			// Start tls client
579			tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper()
580			tlsConn, err := tlsWrap("dc1", conn)
581			require.NoError(t, err)
582
583			// Write Inner magic byte
584			_, err = tlsConn.Write([]byte{byte(tc.innerByte)})
585			require.NoError(t, err)
586
587			if tc.wantClose {
588				// Allow up to a second for a read failure to indicate conn was closed by
589				// server.
590				conn.SetReadDeadline(time.Now().Add(1 * time.Second))
591
592				retry.Run(t, func(r *retry.R) {
593					// Sanity check the conn was closed by attempting to read from it (a
594					// write might not detect the close).
595					buf := make([]byte, 10)
596					_, err = tlsConn.Read(buf)
597					require.Error(r, err)
598					require.Contains(r, err.Error(), "EOF")
599				})
600			} else {
601				// Set a shorter read deadline that should typically be enough to detect
602				// immediate close but will also not make test hang forever. This
603				// positive case is mostly just a sanity check that the test code here
604				// is actually not failing just due to some other error in the way we
605				// setup TLS. It also sanity checks that we still allow valid TLS conns
606				// but if it produces possible false-positives in CI sometimes that's
607				// not such a huge deal - CI won't be brittle and it will have done it's
608				// job as a sanity check most of the time.
609				conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
610				buf := make([]byte, 10)
611				_, err = tlsConn.Read(buf)
612				require.Error(t, err)
613				require.Contains(t, err.Error(), "i/o timeout")
614			}
615		})
616	}
617}
618
619func connectClient(t *testing.T, s1 *Server, mb pool.RPCType, useTLS, wantOpen bool, message string) net.Conn {
620	t.Helper()
621
622	addr := s1.config.RPCAdvertise
623	tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper()
624
625	conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
626	require.NoError(t, err)
627
628	// Write magic byte so we aren't timed out
629	outerByte := mb
630	if useTLS {
631		outerByte = pool.RPCTLS
632	}
633	_, err = conn.Write([]byte{byte(outerByte)})
634	require.NoError(t, err)
635
636	if useTLS {
637		tlsConn, err := tlsWrap(s1.config.Datacenter, conn)
638		// Subtly, tlsWrap will NOT actually do a handshake in this case - it only
639		// does so for some configs, so even if the server closed the conn before
640		// handshake this won't fail and it's only when we attempt to read or write
641		// that we'll see the broken pipe.
642		require.NoError(t, err, "%s: wanted open conn, failed TLS handshake: %s",
643			message, err)
644		conn = tlsConn
645
646		// Write Inner magic byte
647		_, err = conn.Write([]byte{byte(mb)})
648		if !wantOpen {
649			// TLS Handshake will be done on this attempt to write and should fail
650			require.Error(t, err, "%s: wanted closed conn, TLS Handshake succeeded", message)
651		} else {
652			require.NoError(t, err, "%s: wanted open conn, failed writing inner magic byte: %s",
653				message, err)
654		}
655	}
656
657	// Check if the conn is in the state we want.
658	retry.Run(t, func(r *retry.R) {
659		// Don't wait around as server won't be sending data but the read will fail
660		// immediately if the conn is closed.
661		conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
662		buf := make([]byte, 10)
663		_, err := conn.Read(buf)
664		require.Error(r, err)
665		if wantOpen {
666			require.Contains(r, err.Error(), "i/o timeout",
667				"%s: wanted an open conn (read timeout)", message)
668		} else {
669			if useTLS {
670				require.Error(r, err)
671				// TLS may fail during either read or write of the handshake so there
672				// are a few different errors that come up.
673				if !strings.Contains(err.Error(), "read: connection reset by peer") &&
674					!strings.Contains(err.Error(), "write: connection reset by peer") &&
675					!strings.Contains(err.Error(), "write: broken pipe") {
676					r.Fatalf("%s: wanted closed conn got err: %s", message, err)
677				}
678			} else {
679				require.Contains(r, err.Error(), "EOF", "%s: wanted a closed conn",
680					message)
681			}
682		}
683	})
684
685	return conn
686}
687
688func TestRPC_RPCMaxConnsPerClient(t *testing.T) {
689	if testing.Short() {
690		t.Skip("too slow for testing.Short")
691	}
692
693	t.Parallel()
694
695	cases := []struct {
696		name       string
697		magicByte  pool.RPCType
698		tlsEnabled bool
699	}{
700		{"RPC v2", pool.RPCMultiplexV2, false},
701		{"RPC v2 TLS", pool.RPCMultiplexV2, true},
702		{"RPC", pool.RPCConsul, false},
703		{"RPC TLS", pool.RPCConsul, true},
704	}
705
706	for _, tc := range cases {
707		tc := tc
708		t.Run(tc.name, func(t *testing.T) {
709			dir1, s1 := testServerWithConfig(t, func(c *Config) {
710				c.RPCMaxConnsPerClient = 2
711				if tc.tlsEnabled {
712					c.UseTLS = true
713					c.CAFile = "../../test/hostname/CertAuth.crt"
714					c.CertFile = "../../test/hostname/Alice.crt"
715					c.KeyFile = "../../test/hostname/Alice.key"
716					c.VerifyServerHostname = true
717					c.VerifyOutgoing = true
718					c.VerifyIncoming = false // saves us getting client cert setup
719					c.Domain = "consul"
720				}
721			})
722			defer os.RemoveAll(dir1)
723			defer s1.Shutdown()
724
725			// Connect to the server with bare TCP
726			conn1 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn1")
727			defer conn1.Close()
728
729			// Two conns should succeed
730			conn2 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn2")
731			defer conn2.Close()
732
733			// Third should be closed byt the limiter
734			conn3 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, false, "conn3")
735			defer conn3.Close()
736
737			// If we close one of the earlier ones, we should be able to open another
738			addr := conn1.RemoteAddr()
739			conn1.Close()
740			retry.Run(t, func(r *retry.R) {
741				if n := s1.rpcConnLimiter.NumOpen(addr); n >= 2 {
742					r.Fatal("waiting for open conns to drop")
743				}
744			})
745			conn4 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn4")
746			defer conn4.Close()
747
748			// Reload config with higher limit
749			rc := ReloadableConfig{
750				RPCRateLimit:         s1.config.RPCRateLimit,
751				RPCMaxBurst:          s1.config.RPCMaxBurst,
752				RPCMaxConnsPerClient: 10,
753			}
754			require.NoError(t, s1.ReloadConfig(rc))
755
756			// Now another conn should be allowed
757			conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5")
758			defer conn5.Close()
759		})
760	}
761}
762
763func TestRPC_readUint32(t *testing.T) {
764	cases := []struct {
765		name    string
766		writeFn func(net.Conn)
767		readFn  func(*testing.T, net.Conn)
768	}{
769		{
770			name: "timeouts irrelevant",
771			writeFn: func(conn net.Conn) {
772				_ = binary.Write(conn, binary.BigEndian, uint32(42))
773				_ = binary.Write(conn, binary.BigEndian, uint32(math.MaxUint32))
774				_ = binary.Write(conn, binary.BigEndian, uint32(1))
775			},
776			readFn: func(t *testing.T, conn net.Conn) {
777				t.Helper()
778				v, err := readUint32(conn, 5*time.Second)
779				require.NoError(t, err)
780				require.Equal(t, uint32(42), v)
781
782				v, err = readUint32(conn, 5*time.Second)
783				require.NoError(t, err)
784				require.Equal(t, uint32(math.MaxUint32), v)
785
786				v, err = readUint32(conn, 5*time.Second)
787				require.NoError(t, err)
788				require.Equal(t, uint32(1), v)
789			},
790		},
791		{
792			name: "triggers timeout on last read",
793			writeFn: func(conn net.Conn) {
794				_ = binary.Write(conn, binary.BigEndian, uint32(42))
795				_ = binary.Write(conn, binary.BigEndian, uint32(math.MaxUint32))
796				_ = binary.Write(conn, binary.BigEndian, uint16(1)) // half as many bytes as expected
797			},
798			readFn: func(t *testing.T, conn net.Conn) {
799				t.Helper()
800				v, err := readUint32(conn, 5*time.Second)
801				require.NoError(t, err)
802				require.Equal(t, uint32(42), v)
803
804				v, err = readUint32(conn, 5*time.Second)
805				require.NoError(t, err)
806				require.Equal(t, uint32(math.MaxUint32), v)
807
808				_, err = readUint32(conn, 50*time.Millisecond)
809				require.Error(t, err)
810				nerr, ok := err.(net.Error)
811				require.True(t, ok)
812				require.True(t, nerr.Timeout())
813			},
814		},
815	}
816
817	for _, tc := range cases {
818		tc := tc
819		t.Run(tc.name, func(t *testing.T) {
820			var doneWg sync.WaitGroup
821			defer doneWg.Wait()
822
823			client, server := net.Pipe()
824			defer client.Close()
825			defer server.Close()
826
827			// Client pushes some data.
828			doneWg.Add(1)
829			go func() {
830				doneWg.Done()
831				tc.writeFn(client)
832			}()
833
834			// The server tests the function for us.
835			tc.readFn(t, server)
836		})
837	}
838}
839
840func TestRPC_LocalTokenStrippedOnForward(t *testing.T) {
841	if testing.Short() {
842		t.Skip("too slow for testing.Short")
843	}
844
845	t.Parallel()
846	dir1, s1 := testServerWithConfig(t, func(c *Config) {
847		c.PrimaryDatacenter = "dc1"
848		c.ACLsEnabled = true
849		c.ACLDefaultPolicy = "deny"
850		c.ACLMasterToken = "root"
851	})
852	defer os.RemoveAll(dir1)
853	defer s1.Shutdown()
854	testrpc.WaitForLeader(t, s1.RPC, "dc1")
855	codec := rpcClient(t, s1)
856	defer codec.Close()
857
858	dir2, s2 := testServerWithConfig(t, func(c *Config) {
859		c.Datacenter = "dc2"
860		c.PrimaryDatacenter = "dc1"
861		c.ACLsEnabled = true
862		c.ACLDefaultPolicy = "deny"
863		c.ACLTokenReplication = true
864		c.ACLReplicationRate = 100
865		c.ACLReplicationBurst = 100
866		c.ACLReplicationApplyLimit = 1000000
867	})
868	s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
869	testrpc.WaitForLeader(t, s2.RPC, "dc2")
870	defer os.RemoveAll(dir2)
871	defer s2.Shutdown()
872	codec2 := rpcClient(t, s2)
873	defer codec2.Close()
874
875	// Try to join.
876	joinWAN(t, s2, s1)
877	testrpc.WaitForLeader(t, s1.RPC, "dc1")
878	testrpc.WaitForLeader(t, s1.RPC, "dc2")
879
880	// Wait for legacy acls to be disabled so we are clear that
881	// legacy replication isn't meddling.
882	waitForNewACLs(t, s1)
883	waitForNewACLs(t, s2)
884	waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
885
886	// create simple kv policy
887	kvPolicy, err := upsertTestPolicyWithRules(codec, "root", "dc1", `
888	key_prefix "" { policy = "write" }
889	`)
890	require.NoError(t, err)
891
892	// Wait for it to replicate
893	retry.Run(t, func(r *retry.R) {
894		_, p, err := s2.fsm.State().ACLPolicyGetByID(nil, kvPolicy.ID, &structs.EnterpriseMeta{})
895		require.Nil(r, err)
896		require.NotNil(r, p)
897	})
898
899	// create local token that only works in DC2
900	localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) {
901		token.Local = true
902		token.Policies = []structs.ACLTokenPolicyLink{
903			{ID: kvPolicy.ID},
904		}
905	})
906	require.NoError(t, err)
907
908	// Try to use it locally (it should work)
909	arg := structs.KVSRequest{
910		Datacenter: "dc2",
911		Op:         api.KVSet,
912		DirEnt: structs.DirEntry{
913			Key:   "foo",
914			Value: []byte("bar"),
915		},
916		WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
917	}
918	var out bool
919	err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
920	require.NoError(t, err)
921	require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
922
923	// Try to use it remotely
924	arg = structs.KVSRequest{
925		Datacenter: "dc1",
926		Op:         api.KVSet,
927		DirEnt: structs.DirEntry{
928			Key:   "foo",
929			Value: []byte("bar"),
930		},
931		WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
932	}
933	err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
934	if !acl.IsErrPermissionDenied(err) {
935		t.Fatalf("err: %v", err)
936	}
937
938	// Update the anon token to also be able to write to kv
939	{
940		tokenUpsertReq := structs.ACLTokenSetRequest{
941			Datacenter: "dc1",
942			ACLToken: structs.ACLToken{
943				AccessorID: structs.ACLTokenAnonymousID,
944				Policies: []structs.ACLTokenPolicyLink{
945					{
946						ID: kvPolicy.ID,
947					},
948				},
949			},
950			WriteRequest: structs.WriteRequest{Token: "root"},
951		}
952		token := structs.ACLToken{}
953		err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token)
954		require.NoError(t, err)
955		require.NotEmpty(t, token.SecretID)
956	}
957
958	// Try to use it remotely again, but this time it should fallback to anon
959	arg = structs.KVSRequest{
960		Datacenter: "dc1",
961		Op:         api.KVSet,
962		DirEnt: structs.DirEntry{
963			Key:   "foo",
964			Value: []byte("bar"),
965		},
966		WriteRequest: structs.WriteRequest{Token: localToken2.SecretID},
967	}
968	err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out)
969	require.NoError(t, err)
970	require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped")
971}
972
973func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
974	if testing.Short() {
975		t.Skip("too slow for testing.Short")
976	}
977
978	t.Parallel()
979	dir1, s1 := testServerWithConfig(t, func(c *Config) {
980		c.PrimaryDatacenter = "dc1"
981		c.ACLsEnabled = true
982		c.ACLDefaultPolicy = "deny"
983		c.ACLMasterToken = "root"
984		c.RPCConfig.EnableStreaming = true
985	})
986	s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
987	defer os.RemoveAll(dir1)
988	defer s1.Shutdown()
989	testrpc.WaitForLeader(t, s1.RPC, "dc1")
990	codec := rpcClient(t, s1)
991	defer codec.Close()
992
993	dir2, s2 := testServerWithConfig(t, func(c *Config) {
994		c.Datacenter = "dc2"
995		c.PrimaryDatacenter = "dc1"
996		c.ACLsEnabled = true
997		c.ACLDefaultPolicy = "deny"
998		c.ACLTokenReplication = true
999		c.ACLReplicationRate = 100
1000		c.ACLReplicationBurst = 100
1001		c.ACLReplicationApplyLimit = 1000000
1002		c.RPCConfig.EnableStreaming = true
1003	})
1004	s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
1005	s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
1006	testrpc.WaitForLeader(t, s2.RPC, "dc2")
1007	defer os.RemoveAll(dir2)
1008	defer s2.Shutdown()
1009	codec2 := rpcClient(t, s2)
1010	defer codec2.Close()
1011
1012	// Try to join.
1013	joinWAN(t, s2, s1)
1014	testrpc.WaitForLeader(t, s1.RPC, "dc1")
1015	testrpc.WaitForLeader(t, s1.RPC, "dc2")
1016
1017	// Wait for legacy acls to be disabled so we are clear that
1018	// legacy replication isn't meddling.
1019	waitForNewACLs(t, s1)
1020	waitForNewACLs(t, s2)
1021	waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
1022
1023	// create simple service policy
1024	policy, err := upsertTestPolicyWithRules(codec, "root", "dc1", `
1025	node_prefix "" { policy = "read" }
1026	service_prefix "" { policy = "read" }
1027	`)
1028	require.NoError(t, err)
1029
1030	// Wait for it to replicate
1031	retry.Run(t, func(r *retry.R) {
1032		_, p, err := s2.fsm.State().ACLPolicyGetByID(nil, policy.ID, &structs.EnterpriseMeta{})
1033		require.Nil(r, err)
1034		require.NotNil(r, p)
1035	})
1036
1037	// create local token that only works in DC2
1038	localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) {
1039		token.Local = true
1040		token.Policies = []structs.ACLTokenPolicyLink{
1041			{ID: policy.ID},
1042		}
1043	})
1044	require.NoError(t, err)
1045
1046	runStep(t, "Register a dummy node with a service", func(t *testing.T) {
1047		req := &structs.RegisterRequest{
1048			Node:       "node1",
1049			Address:    "3.4.5.6",
1050			Datacenter: "dc1",
1051			Service: &structs.NodeService{
1052				ID:      "redis1",
1053				Service: "redis",
1054				Address: "3.4.5.6",
1055				Port:    8080,
1056			},
1057			WriteRequest: structs.WriteRequest{Token: "root"},
1058		}
1059		var out struct{}
1060		require.NoError(t, s1.RPC("Catalog.Register", &req, &out))
1061	})
1062
1063	var conn *grpc.ClientConn
1064	{
1065		client, builder := newClientWithGRPCResolver(t, func(c *Config) {
1066			c.Datacenter = "dc2"
1067			c.PrimaryDatacenter = "dc1"
1068			c.RPCConfig.EnableStreaming = true
1069		})
1070		joinLAN(t, client, s2)
1071		testrpc.WaitForTestAgent(t, client.RPC, "dc2", testrpc.WithToken("root"))
1072
1073		pool := agent_grpc.NewClientConnPool(agent_grpc.ClientConnPoolConfig{
1074			Servers:               builder,
1075			DialingFromServer:     false,
1076			DialingFromDatacenter: "dc2",
1077		})
1078
1079		conn, err = pool.ClientConn("dc2")
1080		require.NoError(t, err)
1081	}
1082
1083	// Try to use it locally (it should work)
1084	runStep(t, "token used locally should work", func(t *testing.T) {
1085		arg := &pbsubscribe.SubscribeRequest{
1086			Topic:      pbsubscribe.Topic_ServiceHealth,
1087			Key:        "redis",
1088			Token:      localToken2.SecretID,
1089			Datacenter: "dc2",
1090		}
1091		event, err := getFirstSubscribeEventOrError(conn, arg)
1092		require.NoError(t, err)
1093		require.NotNil(t, event)
1094
1095		// make sure that token restore defer works
1096		require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped")
1097	})
1098
1099	runStep(t, "token used remotely should not work", func(t *testing.T) {
1100		arg := &pbsubscribe.SubscribeRequest{
1101			Topic:      pbsubscribe.Topic_ServiceHealth,
1102			Key:        "redis",
1103			Token:      localToken2.SecretID,
1104			Datacenter: "dc1",
1105		}
1106
1107		event, err := getFirstSubscribeEventOrError(conn, arg)
1108
1109		// NOTE: the subscription endpoint is a filtering style instead of a
1110		// hard-fail style so when the token isn't present 100% of the data is
1111		// filtered out leading to a stream with an empty snapshot.
1112		require.NoError(t, err)
1113		require.IsType(t, &pbsubscribe.Event_EndOfSnapshot{}, event.Payload)
1114		require.True(t, event.Payload.(*pbsubscribe.Event_EndOfSnapshot).EndOfSnapshot)
1115	})
1116
1117	runStep(t, "update anonymous token to read services", func(t *testing.T) {
1118		tokenUpsertReq := structs.ACLTokenSetRequest{
1119			Datacenter: "dc1",
1120			ACLToken: structs.ACLToken{
1121				AccessorID: structs.ACLTokenAnonymousID,
1122				Policies: []structs.ACLTokenPolicyLink{
1123					{ID: policy.ID},
1124				},
1125			},
1126			WriteRequest: structs.WriteRequest{Token: "root"},
1127		}
1128		token := structs.ACLToken{}
1129		err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token)
1130		require.NoError(t, err)
1131		require.NotEmpty(t, token.SecretID)
1132	})
1133
1134	runStep(t, "token used remotely should fallback on anonymous token now", func(t *testing.T) {
1135		arg := &pbsubscribe.SubscribeRequest{
1136			Topic:      pbsubscribe.Topic_ServiceHealth,
1137			Key:        "redis",
1138			Token:      localToken2.SecretID,
1139			Datacenter: "dc1",
1140		}
1141
1142		event, err := getFirstSubscribeEventOrError(conn, arg)
1143		require.NoError(t, err)
1144		require.NotNil(t, event)
1145
1146		// So now that we can read data, we should get a snapshot with just instances of the "consul" service.
1147		require.NoError(t, err)
1148
1149		require.IsType(t, &pbsubscribe.Event_ServiceHealth{}, event.Payload)
1150		esh := event.Payload.(*pbsubscribe.Event_ServiceHealth)
1151
1152		require.Equal(t, pbsubscribe.CatalogOp_Register, esh.ServiceHealth.Op)
1153		csn := esh.ServiceHealth.CheckServiceNode
1154
1155		require.NotNil(t, csn)
1156		require.NotNil(t, csn.Node)
1157		require.Equal(t, "node1", csn.Node.Node)
1158		require.Equal(t, "3.4.5.6", csn.Node.Address)
1159		require.NotNil(t, csn.Service)
1160		require.Equal(t, "redis1", csn.Service.ID)
1161		require.Equal(t, "redis", csn.Service.Service)
1162
1163		// make sure that token restore defer works
1164		require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped")
1165	})
1166}
1167
1168func TestCanRetry(t *testing.T) {
1169	type testCase struct {
1170		name     string
1171		req      structs.RPCInfo
1172		err      error
1173		expected bool
1174	}
1175
1176	run := func(t *testing.T, tc testCase) {
1177		require.Equal(t, tc.expected, canRetry(tc.req, tc.err))
1178	}
1179
1180	var testCases = []testCase{
1181		{
1182			name:     "unexpected error",
1183			err:      fmt.Errorf("some arbitrary error"),
1184			expected: false,
1185		},
1186		{
1187			name:     "checking error",
1188			err:      fmt.Errorf("some wrapping :%w", ErrChunkingResubmit),
1189			expected: true,
1190		},
1191		{
1192			name:     "no leader error",
1193			err:      fmt.Errorf("some wrapping: %w", structs.ErrNoLeader),
1194			expected: true,
1195		},
1196		{
1197			name:     "EOF on read request",
1198			req:      isReadRequest{},
1199			err:      io.EOF,
1200			expected: true,
1201		},
1202		{
1203			name:     "EOF on write request",
1204			err:      io.EOF,
1205			expected: false,
1206		},
1207	}
1208
1209	for _, tc := range testCases {
1210		t.Run(tc.name, func(t *testing.T) {
1211			run(t, tc)
1212		})
1213	}
1214}
1215
1216type isReadRequest struct {
1217	structs.RPCInfo
1218}
1219
1220func (r isReadRequest) IsRead() bool {
1221	return true
1222}
1223
1224func TestRPC_AuthorizeRaftRPC(t *testing.T) {
1225	caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
1226	require.NoError(t, err)
1227
1228	caSigner, err := tlsutil.ParseSigner(caPK)
1229	require.NoError(t, err)
1230
1231	dir := testutil.TempDir(t, "certs")
1232	err = ioutil.WriteFile(filepath.Join(dir, "ca.pem"), []byte(caPEM), 0600)
1233	require.NoError(t, err)
1234
1235	intermediatePEM, intermediatePK, err := tlsutil.GenerateCert(tlsutil.CertOpts{IsCA: true, CA: caPEM, Signer: caSigner, Days: 5})
1236	require.NoError(t, err)
1237
1238	err = ioutil.WriteFile(filepath.Join(dir, "intermediate.pem"), []byte(intermediatePEM), 0600)
1239	require.NoError(t, err)
1240
1241	newCert := func(t *testing.T, caPEM, pk, node, name string) {
1242		t.Helper()
1243
1244		signer, err := tlsutil.ParseSigner(pk)
1245		require.NoError(t, err)
1246
1247		pem, key, err := tlsutil.GenerateCert(tlsutil.CertOpts{
1248			Signer:      signer,
1249			CA:          caPEM,
1250			Name:        name,
1251			Days:        5,
1252			DNSNames:    []string{node + "." + name, name, "localhost"},
1253			ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
1254		})
1255		require.NoError(t, err)
1256
1257		err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".pem"), []byte(pem), 0600)
1258		require.NoError(t, err)
1259		err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".key"), []byte(key), 0600)
1260		require.NoError(t, err)
1261	}
1262
1263	newCert(t, caPEM, caPK, "srv1", "server.dc1.consul")
1264
1265	_, connectCApk, err := connect.GeneratePrivateKey()
1266	require.NoError(t, err)
1267
1268	_, srv := testServerWithConfig(t, func(c *Config) {
1269		c.Domain = "consul." // consul. is the default value in agent/config
1270		c.CAFile = filepath.Join(dir, "ca.pem")
1271		c.CertFile = filepath.Join(dir, "srv1-server.dc1.consul.pem")
1272		c.KeyFile = filepath.Join(dir, "srv1-server.dc1.consul.key")
1273		c.VerifyIncoming = true
1274		c.VerifyServerHostname = true
1275		// Enable Auto-Encrypt so that Conenct CA roots are added to the
1276		// tlsutil.Configurator.
1277		c.AutoEncryptAllowTLS = true
1278		c.CAConfig = &structs.CAConfiguration{
1279			ClusterID: connect.TestClusterID,
1280			Provider:  structs.ConsulCAProvider,
1281			Config:    map[string]interface{}{"PrivateKey": connectCApk},
1282		}
1283	})
1284	defer srv.Shutdown()
1285
1286	// Wait for ConnectCA initiation to complete.
1287	retry.Run(t, func(r *retry.R) {
1288		_, root := srv.caManager.getCAProvider()
1289		if root == nil {
1290			r.Fatal("ConnectCA root is still nil")
1291		}
1292	})
1293
1294	useTLSByte := func(t *testing.T, c *tlsutil.Configurator) net.Conn {
1295		wrapper := tlsutil.SpecificDC("dc1", c.OutgoingRPCWrapper())
1296		tlsEnabled := func(_ raft.ServerAddress) bool {
1297			return true
1298		}
1299
1300		rl := NewRaftLayer(nil, nil, wrapper, tlsEnabled)
1301		conn, err := rl.Dial(raft.ServerAddress(srv.Listener.Addr().String()), 100*time.Millisecond)
1302		require.NoError(t, err)
1303		return conn
1304	}
1305
1306	useNativeTLS := func(t *testing.T, c *tlsutil.Configurator) net.Conn {
1307		wrapper := c.OutgoingALPNRPCWrapper()
1308		dialer := &net.Dialer{Timeout: 100 * time.Millisecond}
1309
1310		rawConn, err := dialer.Dial("tcp", srv.Listener.Addr().String())
1311		require.NoError(t, err)
1312
1313		tlsConn, err := wrapper("dc1", "srv1", pool.ALPN_RPCRaft, rawConn)
1314		require.NoError(t, err)
1315		return tlsConn
1316	}
1317
1318	setupAgentTLSCert := func(name string) func(t *testing.T) string {
1319		return func(t *testing.T) string {
1320			newCert(t, caPEM, caPK, "node1", name)
1321			return filepath.Join(dir, "node1-"+name)
1322		}
1323	}
1324
1325	setupAgentTLSCertWithIntermediate := func(name string) func(t *testing.T) string {
1326		return func(t *testing.T) string {
1327			newCert(t, intermediatePEM, intermediatePK, "node1", name)
1328			certPrefix := filepath.Join(dir, "node1-"+name)
1329			f, err := os.OpenFile(certPrefix+".pem", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
1330			if err != nil {
1331				t.Fatal(err)
1332			}
1333			if _, err := f.Write([]byte(intermediatePEM)); err != nil {
1334				t.Fatal(err)
1335			}
1336			if err := f.Close(); err != nil {
1337				t.Fatal(err)
1338			}
1339			return certPrefix
1340		}
1341	}
1342
1343	setupConnectCACert := func(name string) func(t *testing.T) string {
1344		return func(t *testing.T) string {
1345			_, caRoot := srv.caManager.getCAProvider()
1346			newCert(t, caRoot.RootCert, connectCApk, "node1", name)
1347			return filepath.Join(dir, "node1-"+name)
1348		}
1349	}
1350
1351	type testCase struct {
1352		name        string
1353		conn        func(t *testing.T, c *tlsutil.Configurator) net.Conn
1354		setupCert   func(t *testing.T) string
1355		expectError bool
1356	}
1357
1358	run := func(t *testing.T, tc testCase) {
1359		certPath := tc.setupCert(t)
1360
1361		cfg := tlsutil.Config{
1362			VerifyOutgoing:       true,
1363			VerifyServerHostname: true,
1364			CAFile:               filepath.Join(dir, "ca.pem"),
1365			CertFile:             certPath + ".pem",
1366			KeyFile:              certPath + ".key",
1367			Domain:               "consul",
1368		}
1369		c, err := tlsutil.NewConfigurator(cfg, hclog.New(nil))
1370		require.NoError(t, err)
1371
1372		_, err = doRaftRPC(tc.conn(t, c), srv.config.NodeName)
1373		if tc.expectError {
1374			if !isConnectionClosedError(err) {
1375				t.Fatalf("expected a connection closed error, got: %v", err)
1376			}
1377			return
1378		}
1379		require.NoError(t, err)
1380	}
1381
1382	var testCases = []testCase{
1383		{
1384			name:        "TLS byte with client cert",
1385			setupCert:   setupAgentTLSCert("client.dc1.consul"),
1386			conn:        useTLSByte,
1387			expectError: true,
1388		},
1389		{
1390			name:        "TLS byte with server cert in different DC",
1391			setupCert:   setupAgentTLSCert("server.dc2.consul"),
1392			conn:        useTLSByte,
1393			expectError: true,
1394		},
1395		{
1396			name:      "TLS byte with server cert in same DC",
1397			setupCert: setupAgentTLSCert("server.dc1.consul"),
1398			conn:      useTLSByte,
1399		},
1400		{
1401			name:      "TLS byte with server cert in same DC and with unknown intermediate",
1402			setupCert: setupAgentTLSCertWithIntermediate("server.dc1.consul"),
1403			conn:      useTLSByte,
1404		},
1405		{
1406			name:        "TLS byte with ConnectCA leaf cert",
1407			setupCert:   setupConnectCACert("server.dc1.consul"),
1408			conn:        useTLSByte,
1409			expectError: true,
1410		},
1411		{
1412			name:        "native TLS with client cert",
1413			setupCert:   setupAgentTLSCert("client.dc1.consul"),
1414			conn:        useNativeTLS,
1415			expectError: true,
1416		},
1417		{
1418			name:        "native TLS with server cert in different DC",
1419			setupCert:   setupAgentTLSCert("server.dc2.consul"),
1420			conn:        useNativeTLS,
1421			expectError: true,
1422		},
1423		{
1424			name:      "native TLS with server cert in same DC",
1425			setupCert: setupAgentTLSCert("server.dc1.consul"),
1426			conn:      useNativeTLS,
1427		},
1428		{
1429			name:        "native TLS with ConnectCA leaf cert",
1430			setupCert:   setupConnectCACert("server.dc1.consul"),
1431			conn:        useNativeTLS,
1432			expectError: true,
1433		},
1434	}
1435
1436	for _, tc := range testCases {
1437		t.Run(tc.name, func(t *testing.T) {
1438			run(t, tc)
1439		})
1440	}
1441}
1442
1443func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) {
1444	var resp raft.AppendEntriesResponse
1445
1446	var term uint64 = 0xc
1447	a := raft.AppendEntriesRequest{
1448		RPCHeader:         raft.RPCHeader{ProtocolVersion: 3},
1449		Term:              0,
1450		Leader:            []byte(leader),
1451		PrevLogEntry:      0,
1452		PrevLogTerm:       term,
1453		LeaderCommitIndex: 50,
1454	}
1455
1456	if err := appendEntries(conn, a, &resp); err != nil {
1457		return resp, err
1458	}
1459	return resp, nil
1460}
1461
1462func appendEntries(conn net.Conn, req raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error {
1463	w := bufio.NewWriter(conn)
1464	enc := codec.NewEncoder(w, &codec.MsgpackHandle{})
1465
1466	const rpcAppendEntries = 0
1467	if err := w.WriteByte(rpcAppendEntries); err != nil {
1468		return fmt.Errorf("failed to write raft-RPC byte: %w", err)
1469	}
1470
1471	if err := enc.Encode(req); err != nil {
1472		return fmt.Errorf("failed to send append entries RPC: %w", err)
1473	}
1474	if err := w.Flush(); err != nil {
1475		return fmt.Errorf("failed to flush RPC: %w", err)
1476	}
1477
1478	if err := decodeRaftRPCResponse(conn, resp); err != nil {
1479		return fmt.Errorf("response error: %w", err)
1480	}
1481	return nil
1482}
1483
1484// copied and modified from raft/net_transport.go
1485func decodeRaftRPCResponse(conn net.Conn, resp *raft.AppendEntriesResponse) error {
1486	r := bufio.NewReader(conn)
1487	dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
1488
1489	var rpcError string
1490	if err := dec.Decode(&rpcError); err != nil {
1491		return fmt.Errorf("failed to decode response error: %w", err)
1492	}
1493	if err := dec.Decode(resp); err != nil {
1494		return fmt.Errorf("failed to decode response: %w", err)
1495	}
1496	if rpcError != "" {
1497		return fmt.Errorf("rpc error: %v", rpcError)
1498	}
1499	return nil
1500}
1501
1502func isConnectionClosedError(err error) bool {
1503	switch {
1504	case err == nil:
1505		return false
1506	case errors.Is(err, io.EOF):
1507		return true
1508	case strings.Contains(err.Error(), "connection reset by peer"):
1509		return true
1510	default:
1511		return false
1512	}
1513}
1514
1515func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.SubscribeRequest) (*pbsubscribe.Event, error) {
1516	streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
1517	ctx, cancel := context.WithCancel(context.Background())
1518	defer cancel()
1519
1520	handle, err := streamClient.Subscribe(ctx, req)
1521	if err != nil {
1522		return nil, err
1523	}
1524
1525	event, err := handle.Recv()
1526	if err == io.EOF {
1527		return nil, nil
1528	}
1529	if err != nil {
1530		return nil, err
1531	}
1532	return event, nil
1533}
1534