1package consul
2
3import (
4	"fmt"
5	"math"
6	"math/rand"
7	"net/rpc"
8	"os"
9	"strings"
10	"testing"
11	"time"
12
13	"github.com/hashicorp/consul/acl"
14	"github.com/hashicorp/consul/agent/structs"
15	"github.com/hashicorp/consul/lib"
16	"github.com/hashicorp/consul/testrpc"
17	"github.com/hashicorp/consul/testutil/retry"
18	"github.com/hashicorp/net-rpc-msgpackrpc"
19	"github.com/hashicorp/serf/coordinate"
20	"github.com/pascaldekloe/goe/verify"
21)
22
23// generateRandomCoordinate creates a random coordinate. This mucks with the
24// underlying structure directly, so it's not really useful for any particular
25// position in the network, but it's a good payload to send through to make
26// sure things come out the other side or get stored correctly.
27func generateRandomCoordinate() *coordinate.Coordinate {
28	config := coordinate.DefaultConfig()
29	coord := coordinate.NewCoordinate(config)
30	for i := range coord.Vec {
31		coord.Vec[i] = rand.NormFloat64()
32	}
33	coord.Error = rand.NormFloat64()
34	coord.Adjustment = rand.NormFloat64()
35	return coord
36}
37
38func TestCoordinate_Update(t *testing.T) {
39	t.Parallel()
40	dir1, s1 := testServerWithConfig(t, func(c *Config) {
41		c.CoordinateUpdatePeriod = 500 * time.Millisecond
42		c.CoordinateUpdateBatchSize = 5
43		c.CoordinateUpdateMaxBatches = 2
44	})
45	defer os.RemoveAll(dir1)
46	defer s1.Shutdown()
47
48	codec := rpcClient(t, s1)
49	defer codec.Close()
50	testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
51
52	// Register some nodes.
53	nodes := []string{"node1", "node2"}
54	if err := registerNodes(nodes, codec); err != nil {
55		t.Fatal(err)
56	}
57
58	// Send an update for the first node.
59	arg1 := structs.CoordinateUpdateRequest{
60		Datacenter: "dc1",
61		Node:       "node1",
62		Coord:      generateRandomCoordinate(),
63	}
64	var out struct{}
65	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
66		t.Fatalf("err: %v", err)
67	}
68
69	// Send an update for the second node.
70	arg2 := structs.CoordinateUpdateRequest{
71		Datacenter: "dc1",
72		Node:       "node2",
73		Coord:      generateRandomCoordinate(),
74	}
75	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
76		t.Fatalf("err: %v", err)
77	}
78
79	// Make sure the updates did not yet apply because the update period
80	// hasn't expired.
81	state := s1.fsm.State()
82	_, c, err := state.Coordinate("node1", nil)
83	if err != nil {
84		t.Fatalf("err: %v", err)
85	}
86	verify.Values(t, "", c, lib.CoordinateSet{})
87
88	_, c, err = state.Coordinate("node2", nil)
89	if err != nil {
90		t.Fatalf("err: %v", err)
91	}
92	verify.Values(t, "", c, lib.CoordinateSet{})
93
94	// Send another update for the second node. It should take precedence
95	// since there will be two updates in the same batch.
96	arg2.Coord = generateRandomCoordinate()
97	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
98		t.Fatalf("err: %v", err)
99	}
100
101	// Wait a while and the updates should get picked up.
102	time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
103	_, c, err = state.Coordinate("node1", nil)
104	if err != nil {
105		t.Fatalf("err: %v", err)
106	}
107	expected := lib.CoordinateSet{
108		"": arg1.Coord,
109	}
110	verify.Values(t, "", c, expected)
111
112	_, c, err = state.Coordinate("node2", nil)
113	if err != nil {
114		t.Fatalf("err: %v", err)
115	}
116	expected = lib.CoordinateSet{
117		"": arg2.Coord,
118	}
119	verify.Values(t, "", c, expected)
120
121	// Register a bunch of additional nodes.
122	spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
123	for i := 0; i < spamLen; i++ {
124		req := structs.RegisterRequest{
125			Datacenter: "dc1",
126			Node:       fmt.Sprintf("bogusnode%d", i),
127			Address:    "127.0.0.1",
128		}
129		var reply struct{}
130		if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
131			t.Fatalf("err: %v", err)
132		}
133	}
134
135	// Now spam some coordinate updates and make sure it starts throwing
136	// them away if they exceed the batch allowance. Note we have to make
137	// unique names since these are held in map by node name.
138	for i := 0; i < spamLen; i++ {
139		arg1.Node = fmt.Sprintf("bogusnode%d", i)
140		arg1.Coord = generateRandomCoordinate()
141		if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
142			t.Fatalf("err: %v", err)
143		}
144	}
145
146	// Wait a little while for the batch routine to run, then make sure
147	// exactly one of the updates got dropped (we won't know which one).
148	time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
149	numDropped := 0
150	for i := 0; i < spamLen; i++ {
151		_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil)
152		if err != nil {
153			t.Fatalf("err: %v", err)
154		}
155		if len(c) == 0 {
156			numDropped++
157		}
158	}
159	if numDropped != 1 {
160		t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
161	}
162
163	// Send a coordinate with a NaN to make sure that we don't absorb that
164	// into the database.
165	arg2.Coord.Vec[0] = math.NaN()
166	err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
167	if err == nil || !strings.Contains(err.Error(), "invalid coordinate") {
168		t.Fatalf("should have failed with an error, got %v", err)
169	}
170
171	// Finally, send a coordinate with the wrong dimensionality to make sure
172	// there are no panics, and that it gets rejected.
173	arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
174	err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
175	if err == nil || !strings.Contains(err.Error(), "incompatible coordinate") {
176		t.Fatalf("should have failed with an error, got %v", err)
177	}
178}
179
180func TestCoordinate_Update_ACLDeny(t *testing.T) {
181	t.Parallel()
182	dir1, s1 := testServerWithConfig(t, func(c *Config) {
183		c.ACLDatacenter = "dc1"
184		c.ACLMasterToken = "root"
185		c.ACLDefaultPolicy = "deny"
186		c.ACLEnforceVersion8 = false
187	})
188	defer os.RemoveAll(dir1)
189	defer s1.Shutdown()
190	codec := rpcClient(t, s1)
191	defer codec.Close()
192
193	testrpc.WaitForLeader(t, s1.RPC, "dc1")
194
195	// Register some nodes.
196	nodes := []string{"node1", "node2"}
197	if err := registerNodes(nodes, codec); err != nil {
198		t.Fatal(err)
199	}
200
201	// Send an update for the first node. This should go through since we
202	// don't have version 8 ACLs enforced yet.
203	req := structs.CoordinateUpdateRequest{
204		Datacenter: "dc1",
205		Node:       "node1",
206		Coord:      generateRandomCoordinate(),
207	}
208	var out struct{}
209	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
210		t.Fatalf("err: %v", err)
211	}
212
213	// Now turn on version 8 enforcement and try again.
214	s1.config.ACLEnforceVersion8 = true
215	err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
216	if !acl.IsErrPermissionDenied(err) {
217		t.Fatalf("err: %v", err)
218	}
219
220	// Create an ACL that can write to the node.
221	arg := structs.ACLRequest{
222		Datacenter: "dc1",
223		Op:         structs.ACLSet,
224		ACL: structs.ACL{
225			Name: "User token",
226			Type: structs.ACLTypeClient,
227			Rules: `
228node "node1" {
229	policy = "write"
230}
231`,
232		},
233		WriteRequest: structs.WriteRequest{Token: "root"},
234	}
235	var id string
236	if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
237		t.Fatalf("err: %v", err)
238	}
239
240	// With the token, it should now go through.
241	req.Token = id
242	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
243		t.Fatalf("err: %v", err)
244	}
245
246	// But it should be blocked for the other node.
247	req.Node = "node2"
248	err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
249	if !acl.IsErrPermissionDenied(err) {
250		t.Fatalf("err: %v", err)
251	}
252}
253
254func TestCoordinate_ListDatacenters(t *testing.T) {
255	t.Parallel()
256	dir1, s1 := testServer(t)
257	defer os.RemoveAll(dir1)
258	defer s1.Shutdown()
259	codec := rpcClient(t, s1)
260	defer codec.Close()
261
262	testrpc.WaitForLeader(t, s1.RPC, "dc1")
263
264	// It's super hard to force the Serfs into a known configuration of
265	// coordinates, so the best we can do is make sure our own DC shows
266	// up in the list with the proper coordinates. The guts of the algorithm
267	// are extensively tested in rtt_test.go using a mock database.
268	var out []structs.DatacenterMap
269	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListDatacenters", struct{}{}, &out); err != nil {
270		t.Fatalf("err: %v", err)
271	}
272	if len(out) != 1 ||
273		out[0].Datacenter != "dc1" ||
274		len(out[0].Coordinates) != 1 ||
275		out[0].Coordinates[0].Node != s1.config.NodeName {
276		t.Fatalf("bad: %v", out)
277	}
278	c, err := s1.serfWAN.GetCoordinate()
279	if err != nil {
280		t.Fatalf("bad: %v", err)
281	}
282	verify.Values(t, "", c, out[0].Coordinates[0].Coord)
283}
284
285func TestCoordinate_ListNodes(t *testing.T) {
286	t.Parallel()
287	dir1, s1 := testServer(t)
288	defer os.RemoveAll(dir1)
289	defer s1.Shutdown()
290
291	codec := rpcClient(t, s1)
292	defer codec.Close()
293	testrpc.WaitForLeader(t, s1.RPC, "dc1")
294
295	// Register some nodes.
296	nodes := []string{"foo", "bar", "baz"}
297	if err := registerNodes(nodes, codec); err != nil {
298		t.Fatal(err)
299	}
300
301	// Send coordinate updates for a few nodes.
302	arg1 := structs.CoordinateUpdateRequest{
303		Datacenter: "dc1",
304		Node:       "foo",
305		Coord:      generateRandomCoordinate(),
306	}
307	var out struct{}
308	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
309		t.Fatalf("err: %v", err)
310	}
311
312	arg2 := structs.CoordinateUpdateRequest{
313		Datacenter: "dc1",
314		Node:       "bar",
315		Coord:      generateRandomCoordinate(),
316	}
317	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
318		t.Fatalf("err: %v", err)
319	}
320
321	arg3 := structs.CoordinateUpdateRequest{
322		Datacenter: "dc1",
323		Node:       "baz",
324		Coord:      generateRandomCoordinate(),
325	}
326	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
327		t.Fatalf("err: %v", err)
328	}
329	// Now query back for all the nodes.
330	retry.Run(t, func(r *retry.R) {
331		arg := structs.DCSpecificRequest{
332			Datacenter: "dc1",
333		}
334		resp := structs.IndexedCoordinates{}
335		if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
336			r.Fatalf("err: %v", err)
337		}
338		if len(resp.Coordinates) != 3 ||
339			resp.Coordinates[0].Node != "bar" ||
340			resp.Coordinates[1].Node != "baz" ||
341			resp.Coordinates[2].Node != "foo" {
342			r.Fatalf("bad: %v", resp.Coordinates)
343		}
344		verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar
345		verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz
346		verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo
347	})
348}
349
350func TestCoordinate_ListNodes_ACLFilter(t *testing.T) {
351	t.Parallel()
352	dir1, s1 := testServerWithConfig(t, func(c *Config) {
353		c.ACLDatacenter = "dc1"
354		c.ACLMasterToken = "root"
355		c.ACLDefaultPolicy = "deny"
356		c.ACLEnforceVersion8 = false
357	})
358	defer os.RemoveAll(dir1)
359	defer s1.Shutdown()
360	codec := rpcClient(t, s1)
361	defer codec.Close()
362
363	testrpc.WaitForLeader(t, s1.RPC, "dc1")
364
365	// Register some nodes.
366	nodes := []string{"foo", "bar", "baz"}
367	for _, node := range nodes {
368		req := structs.RegisterRequest{
369			Datacenter: "dc1",
370			Node:       node,
371			Address:    "127.0.0.1",
372			WriteRequest: structs.WriteRequest{
373				Token: "root",
374			},
375		}
376		var reply struct{}
377		if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
378			t.Fatalf("err: %v", err)
379		}
380	}
381
382	// Send coordinate updates for a few nodes.
383	arg1 := structs.CoordinateUpdateRequest{
384		Datacenter: "dc1",
385		Node:       "foo",
386		Coord:      generateRandomCoordinate(),
387		WriteRequest: structs.WriteRequest{
388			Token: "root",
389		},
390	}
391	var out struct{}
392	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
393		t.Fatalf("err: %v", err)
394	}
395
396	arg2 := structs.CoordinateUpdateRequest{
397		Datacenter: "dc1",
398		Node:       "bar",
399		Coord:      generateRandomCoordinate(),
400		WriteRequest: structs.WriteRequest{
401			Token: "root",
402		},
403	}
404	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
405		t.Fatalf("err: %v", err)
406	}
407
408	arg3 := structs.CoordinateUpdateRequest{
409		Datacenter: "dc1",
410		Node:       "baz",
411		Coord:      generateRandomCoordinate(),
412		WriteRequest: structs.WriteRequest{
413			Token: "root",
414		},
415	}
416	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
417		t.Fatalf("err: %v", err)
418	}
419	// Wait for all the coordinate updates to apply. Since we aren't
420	// enforcing version 8 ACLs, this should also allow us to read
421	// everything back without a token.
422	retry.Run(t, func(r *retry.R) {
423		arg := structs.DCSpecificRequest{
424			Datacenter: "dc1",
425		}
426		resp := structs.IndexedCoordinates{}
427		if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
428			r.Fatalf("err: %v", err)
429		}
430		if got, want := len(resp.Coordinates), 3; got != want {
431			r.Fatalf("got %d coordinates want %d", got, want)
432		}
433	})
434
435	// Now that we've waited for the batch processing to ingest the
436	// coordinates we can do the rest of the requests without the loop. We
437	// will start by turning on version 8 ACL support which should block
438	// everything.
439	s1.config.ACLEnforceVersion8 = true
440	arg := structs.DCSpecificRequest{
441		Datacenter: "dc1",
442	}
443	resp := structs.IndexedCoordinates{}
444	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
445		t.Fatalf("err: %v", err)
446	}
447	if len(resp.Coordinates) != 0 {
448		t.Fatalf("bad: %#v", resp.Coordinates)
449	}
450
451	// Create an ACL that can read one of the nodes.
452	var id string
453	{
454		req := structs.ACLRequest{
455			Datacenter: "dc1",
456			Op:         structs.ACLSet,
457			ACL: structs.ACL{
458				Name: "User token",
459				Type: structs.ACLTypeClient,
460				Rules: `
461node "foo" {
462	policy = "read"
463}
464`,
465			},
466			WriteRequest: structs.WriteRequest{Token: "root"},
467		}
468		if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &id); err != nil {
469			t.Fatalf("err: %v", err)
470		}
471	}
472
473	// With the token, it should now go through.
474	arg.Token = id
475	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
476		t.Fatalf("err: %v", err)
477	}
478	if len(resp.Coordinates) != 1 || resp.Coordinates[0].Node != "foo" {
479		t.Fatalf("bad: %#v", resp.Coordinates)
480	}
481}
482
483func TestCoordinate_Node(t *testing.T) {
484	t.Parallel()
485	dir1, s1 := testServer(t)
486	defer os.RemoveAll(dir1)
487	defer s1.Shutdown()
488
489	codec := rpcClient(t, s1)
490	defer codec.Close()
491	testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
492
493	// Register some nodes.
494	nodes := []string{"foo", "bar"}
495	if err := registerNodes(nodes, codec); err != nil {
496		t.Fatal(err)
497	}
498
499	// Send coordinate updates for each node.
500	arg1 := structs.CoordinateUpdateRequest{
501		Datacenter: "dc1",
502		Node:       "foo",
503		Coord:      generateRandomCoordinate(),
504	}
505	var out struct{}
506	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
507		t.Fatalf("err: %v", err)
508	}
509
510	arg2 := structs.CoordinateUpdateRequest{
511		Datacenter: "dc1",
512		Node:       "bar",
513		Coord:      generateRandomCoordinate(),
514	}
515	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
516		t.Fatalf("err: %v", err)
517	}
518
519	// Now query back for a specific node (make sure we only get coordinates for foo).
520	retry.Run(t, func(r *retry.R) {
521		arg := structs.NodeSpecificRequest{
522			Node:       "foo",
523			Datacenter: "dc1",
524		}
525		resp := structs.IndexedCoordinates{}
526		if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
527			r.Fatalf("err: %v", err)
528		}
529		if len(resp.Coordinates) != 1 ||
530			resp.Coordinates[0].Node != "foo" {
531			r.Fatalf("bad: %v", resp.Coordinates)
532		}
533		verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo
534	})
535}
536
537func TestCoordinate_Node_ACLDeny(t *testing.T) {
538	t.Parallel()
539	dir1, s1 := testServerWithConfig(t, func(c *Config) {
540		c.ACLDatacenter = "dc1"
541		c.ACLMasterToken = "root"
542		c.ACLDefaultPolicy = "deny"
543		c.ACLEnforceVersion8 = false
544	})
545	defer os.RemoveAll(dir1)
546	defer s1.Shutdown()
547	codec := rpcClient(t, s1)
548	defer codec.Close()
549
550	testrpc.WaitForLeader(t, s1.RPC, "dc1")
551
552	// Register some nodes.
553	nodes := []string{"node1", "node2"}
554	if err := registerNodes(nodes, codec); err != nil {
555		t.Fatal(err)
556	}
557
558	coord := generateRandomCoordinate()
559	req := structs.CoordinateUpdateRequest{
560		Datacenter: "dc1",
561		Node:       "node1",
562		Coord:      coord,
563	}
564	var out struct{}
565	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
566		t.Fatalf("err: %v", err)
567	}
568
569	// Try a read for the first node. This should go through since we
570	// don't have version 8 ACLs enforced yet.
571	arg := structs.NodeSpecificRequest{
572		Node:       "node1",
573		Datacenter: "dc1",
574	}
575	resp := structs.IndexedCoordinates{}
576	retry.Run(t, func(r *retry.R) {
577		if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
578			r.Fatalf("err: %v", err)
579		}
580		if len(resp.Coordinates) != 1 ||
581			resp.Coordinates[0].Node != "node1" {
582			r.Fatalf("bad: %v", resp.Coordinates)
583		}
584		verify.Values(t, "", resp.Coordinates[0].Coord, coord)
585	})
586
587	// Now turn on version 8 enforcement and try again.
588	s1.config.ACLEnforceVersion8 = true
589	err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp)
590	if !acl.IsErrPermissionDenied(err) {
591		t.Fatalf("err: %v", err)
592	}
593
594	// Create an ACL that can read from the node.
595	aclReq := structs.ACLRequest{
596		Datacenter: "dc1",
597		Op:         structs.ACLSet,
598		ACL: structs.ACL{
599			Name: "User token",
600			Type: structs.ACLTypeClient,
601			Rules: `
602node "node1" {
603	policy = "read"
604}
605`,
606		},
607		WriteRequest: structs.WriteRequest{Token: "root"},
608	}
609	var id string
610	if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &aclReq, &id); err != nil {
611		t.Fatalf("err: %v", err)
612	}
613
614	// With the token, it should now go through.
615	arg.Token = id
616	if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
617		t.Fatalf("err: %v", err)
618	}
619
620	// But it should be blocked for the other node.
621	arg.Node = "node2"
622	err = msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp)
623	if !acl.IsErrPermissionDenied(err) {
624		t.Fatalf("err: %v", err)
625	}
626}
627
628func registerNodes(nodes []string, codec rpc.ClientCodec) error {
629	for _, node := range nodes {
630		req := structs.RegisterRequest{
631			Datacenter: "dc1",
632			Node:       node,
633			Address:    "127.0.0.1",
634		}
635		var reply struct{}
636		if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
637			return err
638		}
639	}
640
641	return nil
642}
643