1package consul
2
3import (
4	"fmt"
5	"os"
6	"strings"
7	"testing"
8	"time"
9
10	"github.com/hashicorp/consul/acl"
11	"github.com/hashicorp/consul/agent/structs"
12	"github.com/hashicorp/consul/lib/freeport"
13	"github.com/hashicorp/consul/testrpc"
14	"github.com/hashicorp/net-rpc-msgpackrpc"
15	"github.com/hashicorp/raft"
16	"github.com/pascaldekloe/goe/verify"
17)
18
19func TestOperator_RaftGetConfiguration(t *testing.T) {
20	t.Parallel()
21	dir1, s1 := testServer(t)
22	defer os.RemoveAll(dir1)
23	defer s1.Shutdown()
24	codec := rpcClient(t, s1)
25	defer codec.Close()
26
27	testrpc.WaitForLeader(t, s1.RPC, "dc1")
28
29	arg := structs.DCSpecificRequest{
30		Datacenter: "dc1",
31	}
32	var reply structs.RaftConfigurationResponse
33	if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
34		t.Fatalf("err: %v", err)
35	}
36
37	future := s1.raft.GetConfiguration()
38	if err := future.Error(); err != nil {
39		t.Fatalf("err: %v", err)
40	}
41	if len(future.Configuration().Servers) != 1 {
42		t.Fatalf("bad: %v", future.Configuration().Servers)
43	}
44	me := future.Configuration().Servers[0]
45	expected := structs.RaftConfigurationResponse{
46		Servers: []*structs.RaftServer{
47			&structs.RaftServer{
48				ID:              me.ID,
49				Node:            s1.config.NodeName,
50				Address:         me.Address,
51				Leader:          true,
52				Voter:           true,
53				ProtocolVersion: "3",
54			},
55		},
56		Index: future.Index(),
57	}
58	verify.Values(t, "", reply, expected)
59}
60
61func TestOperator_RaftGetConfiguration_ACLDeny(t *testing.T) {
62	t.Parallel()
63	dir1, s1 := testServerWithConfig(t, func(c *Config) {
64		c.ACLDatacenter = "dc1"
65		c.ACLMasterToken = "root"
66		c.ACLDefaultPolicy = "deny"
67	})
68	defer os.RemoveAll(dir1)
69	defer s1.Shutdown()
70	codec := rpcClient(t, s1)
71	defer codec.Close()
72
73	testrpc.WaitForLeader(t, s1.RPC, "dc1")
74
75	// Make a request with no token to make sure it gets denied.
76	arg := structs.DCSpecificRequest{
77		Datacenter: "dc1",
78	}
79	var reply structs.RaftConfigurationResponse
80	err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
81	if !acl.IsErrPermissionDenied(err) {
82		t.Fatalf("err: %v", err)
83	}
84
85	// Create an ACL with operator read permissions.
86	var token string
87	{
88		var rules = `
89                    operator = "read"
90                `
91
92		req := structs.ACLRequest{
93			Datacenter: "dc1",
94			Op:         structs.ACLSet,
95			ACL: structs.ACL{
96				Name:  "User token",
97				Type:  structs.ACLTypeClient,
98				Rules: rules,
99			},
100			WriteRequest: structs.WriteRequest{Token: "root"},
101		}
102		if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
103			t.Fatalf("err: %v", err)
104		}
105	}
106
107	// Now it should go through.
108	arg.Token = token
109	if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
110		t.Fatalf("err: %v", err)
111	}
112
113	future := s1.raft.GetConfiguration()
114	if err := future.Error(); err != nil {
115		t.Fatalf("err: %v", err)
116	}
117	if len(future.Configuration().Servers) != 1 {
118		t.Fatalf("bad: %v", future.Configuration().Servers)
119	}
120	me := future.Configuration().Servers[0]
121	expected := structs.RaftConfigurationResponse{
122		Servers: []*structs.RaftServer{
123			&structs.RaftServer{
124				ID:              me.ID,
125				Node:            s1.config.NodeName,
126				Address:         me.Address,
127				Leader:          true,
128				Voter:           true,
129				ProtocolVersion: "3",
130			},
131		},
132		Index: future.Index(),
133	}
134	verify.Values(t, "", reply, expected)
135}
136
137func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
138	t.Parallel()
139	dir1, s1 := testServer(t)
140	defer os.RemoveAll(dir1)
141	defer s1.Shutdown()
142	codec := rpcClient(t, s1)
143	defer codec.Close()
144
145	testrpc.WaitForLeader(t, s1.RPC, "dc1")
146
147	// Try to remove a peer that's not there.
148	arg := structs.RaftRemovePeerRequest{
149		Datacenter: "dc1",
150		Address:    raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.Get(1)[0])),
151	}
152	var reply struct{}
153	err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
154	if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
155		t.Fatalf("err: %v", err)
156	}
157
158	// Add it manually to Raft.
159	{
160		id := raft.ServerID("fake-node-id")
161		future := s1.raft.AddVoter(id, arg.Address, 0, time.Second)
162		if err := future.Error(); err != nil {
163			t.Fatalf("err: %v", err)
164		}
165	}
166
167	// Make sure it's there.
168	{
169		future := s1.raft.GetConfiguration()
170		if err := future.Error(); err != nil {
171			t.Fatalf("err: %v", err)
172		}
173		configuration := future.Configuration()
174		if len(configuration.Servers) != 2 {
175			t.Fatalf("bad: %v", configuration)
176		}
177	}
178
179	// Remove it, now it should go through.
180	if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil {
181		t.Fatalf("err: %v", err)
182	}
183
184	// Make sure it's not there.
185	{
186		future := s1.raft.GetConfiguration()
187		if err := future.Error(); err != nil {
188			t.Fatalf("err: %v", err)
189		}
190		configuration := future.Configuration()
191		if len(configuration.Servers) != 1 {
192			t.Fatalf("bad: %v", configuration)
193		}
194	}
195}
196
197func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
198	t.Parallel()
199	dir1, s1 := testServerWithConfig(t, func(c *Config) {
200		c.ACLDatacenter = "dc1"
201		c.ACLMasterToken = "root"
202		c.ACLDefaultPolicy = "deny"
203	})
204	defer os.RemoveAll(dir1)
205	defer s1.Shutdown()
206	codec := rpcClient(t, s1)
207	defer codec.Close()
208
209	testrpc.WaitForLeader(t, s1.RPC, "dc1")
210
211	// Make a request with no token to make sure it gets denied.
212	arg := structs.RaftRemovePeerRequest{
213		Datacenter: "dc1",
214		Address:    raft.ServerAddress(s1.config.RPCAddr.String()),
215	}
216	var reply struct{}
217	err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
218	if !acl.IsErrPermissionDenied(err) {
219		t.Fatalf("err: %v", err)
220	}
221
222	// Create an ACL with operator write permissions.
223	var token string
224	{
225		var rules = `
226                    operator = "write"
227                `
228
229		req := structs.ACLRequest{
230			Datacenter: "dc1",
231			Op:         structs.ACLSet,
232			ACL: structs.ACL{
233				Name:  "User token",
234				Type:  structs.ACLTypeClient,
235				Rules: rules,
236			},
237			WriteRequest: structs.WriteRequest{Token: "root"},
238		}
239		if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
240			t.Fatalf("err: %v", err)
241		}
242	}
243
244	// Now it should kick back for being an invalid config, which means it
245	// tried to do the operation.
246	arg.Token = token
247	err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
248	if err == nil || !strings.Contains(err.Error(), "at least one voter") {
249		t.Fatalf("err: %v", err)
250	}
251}
252
253func TestOperator_RaftRemovePeerByID(t *testing.T) {
254	t.Parallel()
255	dir1, s1 := testServerWithConfig(t, func(c *Config) {
256		c.RaftConfig.ProtocolVersion = 3
257	})
258	defer os.RemoveAll(dir1)
259	defer s1.Shutdown()
260	codec := rpcClient(t, s1)
261	defer codec.Close()
262
263	testrpc.WaitForLeader(t, s1.RPC, "dc1")
264
265	// Try to remove a peer that's not there.
266	arg := structs.RaftRemovePeerRequest{
267		Datacenter: "dc1",
268		ID:         raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
269	}
270	var reply struct{}
271	err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
272	if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
273		t.Fatalf("err: %v", err)
274	}
275
276	// Add it manually to Raft.
277	{
278		future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", freeport.Get(1)[0])), 0, 0)
279		if err := future.Error(); err != nil {
280			t.Fatalf("err: %v", err)
281		}
282	}
283
284	// Make sure it's there.
285	{
286		future := s1.raft.GetConfiguration()
287		if err := future.Error(); err != nil {
288			t.Fatalf("err: %v", err)
289		}
290		configuration := future.Configuration()
291		if len(configuration.Servers) != 2 {
292			t.Fatalf("bad: %v", configuration)
293		}
294	}
295
296	// Remove it, now it should go through.
297	if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
298		t.Fatalf("err: %v", err)
299	}
300
301	// Make sure it's not there.
302	{
303		future := s1.raft.GetConfiguration()
304		if err := future.Error(); err != nil {
305			t.Fatalf("err: %v", err)
306		}
307		configuration := future.Configuration()
308		if len(configuration.Servers) != 1 {
309			t.Fatalf("bad: %v", configuration)
310		}
311	}
312}
313
314func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
315	t.Parallel()
316	dir1, s1 := testServerWithConfig(t, func(c *Config) {
317		c.ACLDatacenter = "dc1"
318		c.ACLMasterToken = "root"
319		c.ACLDefaultPolicy = "deny"
320		c.RaftConfig.ProtocolVersion = 3
321	})
322	defer os.RemoveAll(dir1)
323	defer s1.Shutdown()
324	codec := rpcClient(t, s1)
325	defer codec.Close()
326
327	testrpc.WaitForLeader(t, s1.RPC, "dc1")
328
329	// Make a request with no token to make sure it gets denied.
330	arg := structs.RaftRemovePeerRequest{
331		Datacenter: "dc1",
332		ID:         raft.ServerID(s1.config.NodeID),
333	}
334	var reply struct{}
335	err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
336	if !acl.IsErrPermissionDenied(err) {
337		t.Fatalf("err: %v", err)
338	}
339
340	// Create an ACL with operator write permissions.
341	var token string
342	{
343		var rules = `
344                    operator = "write"
345                `
346
347		req := structs.ACLRequest{
348			Datacenter: "dc1",
349			Op:         structs.ACLSet,
350			ACL: structs.ACL{
351				Name:  "User token",
352				Type:  structs.ACLTypeClient,
353				Rules: rules,
354			},
355			WriteRequest: structs.WriteRequest{Token: "root"},
356		}
357		if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
358			t.Fatalf("err: %v", err)
359		}
360	}
361
362	// Now it should kick back for being an invalid config, which means it
363	// tried to do the operation.
364	arg.Token = token
365	err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
366	if err == nil || !strings.Contains(err.Error(), "at least one voter") {
367		t.Fatalf("err: %v", err)
368	}
369}
370