1package nomad
2
3import (
4	"errors"
5	"fmt"
6	"net"
7	"reflect"
8	"strings"
9	"testing"
10	"time"
11
12	memdb "github.com/hashicorp/go-memdb"
13	msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
14	vapi "github.com/hashicorp/vault/api"
15	"github.com/kr/pretty"
16	"github.com/stretchr/testify/assert"
17	"github.com/stretchr/testify/require"
18
19	"github.com/hashicorp/nomad/acl"
20	"github.com/hashicorp/nomad/command/agent/consul"
21	"github.com/hashicorp/nomad/helper"
22	"github.com/hashicorp/nomad/helper/uuid"
23	"github.com/hashicorp/nomad/nomad/mock"
24	"github.com/hashicorp/nomad/nomad/state"
25	"github.com/hashicorp/nomad/nomad/structs"
26	"github.com/hashicorp/nomad/testutil"
27)
28
29func TestClientEndpoint_Register(t *testing.T) {
30	t.Parallel()
31	require := require.New(t)
32
33	s1, cleanupS1 := TestServer(t, nil)
34	defer cleanupS1()
35	codec := rpcClient(t, s1)
36	testutil.WaitForLeader(t, s1.RPC)
37
38	// Check that we have no client connections
39	require.Empty(s1.connectedNodes())
40
41	// Create the register request
42	node := mock.Node()
43	req := &structs.NodeRegisterRequest{
44		Node:         node,
45		WriteRequest: structs.WriteRequest{Region: "global"},
46	}
47
48	// Fetch the response
49	var resp structs.GenericResponse
50	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil {
51		t.Fatalf("err: %v", err)
52	}
53	if resp.Index == 0 {
54		t.Fatalf("bad index: %d", resp.Index)
55	}
56
57	// Check that we have the client connections
58	nodes := s1.connectedNodes()
59	require.Len(nodes, 1)
60	require.Contains(nodes, node.ID)
61
62	// Check for the node in the FSM
63	state := s1.fsm.State()
64	ws := memdb.NewWatchSet()
65	out, err := state.NodeByID(ws, node.ID)
66	if err != nil {
67		t.Fatalf("err: %v", err)
68	}
69	if out == nil {
70		t.Fatalf("expected node")
71	}
72	if out.CreateIndex != resp.Index {
73		t.Fatalf("index mis-match")
74	}
75	if out.ComputedClass == "" {
76		t.Fatal("ComputedClass not set")
77	}
78
79	// Close the connection and check that we remove the client connections
80	require.Nil(codec.Close())
81	testutil.WaitForResult(func() (bool, error) {
82		nodes := s1.connectedNodes()
83		return len(nodes) == 0, nil
84	}, func(err error) {
85		t.Fatalf("should have no clients")
86	})
87}
88
89// This test asserts that we only track node connections if they are not from
90// forwarded RPCs. This is essential otherwise we will think a Yamux session to
91// a Nomad server is actually the session to the node.
92func TestClientEndpoint_Register_NodeConn_Forwarded(t *testing.T) {
93	t.Parallel()
94	require := require.New(t)
95
96	s1, cleanupS1 := TestServer(t, func(c *Config) {
97		c.BootstrapExpect = 2
98	})
99
100	defer cleanupS1()
101	s2, cleanupS2 := TestServer(t, func(c *Config) {
102		c.BootstrapExpect = 2
103	})
104	defer cleanupS2()
105	TestJoin(t, s1, s2)
106	testutil.WaitForLeader(t, s1.RPC)
107	testutil.WaitForLeader(t, s2.RPC)
108
109	// Determine the non-leader server
110	var leader, nonLeader *Server
111	if s1.IsLeader() {
112		leader = s1
113		nonLeader = s2
114	} else {
115		leader = s2
116		nonLeader = s1
117	}
118
119	// Send the requests to the non-leader
120	codec := rpcClient(t, nonLeader)
121
122	// Check that we have no client connections
123	require.Empty(nonLeader.connectedNodes())
124	require.Empty(leader.connectedNodes())
125
126	// Create the register request
127	node := mock.Node()
128	req := &structs.NodeRegisterRequest{
129		Node:         node,
130		WriteRequest: structs.WriteRequest{Region: "global"},
131	}
132
133	// Fetch the response
134	var resp structs.GenericResponse
135	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil {
136		t.Fatalf("err: %v", err)
137	}
138	if resp.Index == 0 {
139		t.Fatalf("bad index: %d", resp.Index)
140	}
141
142	// Check that we have the client connections on the non leader
143	nodes := nonLeader.connectedNodes()
144	require.Len(nodes, 1)
145	require.Contains(nodes, node.ID)
146
147	// Check that we have no client connections on the leader
148	nodes = leader.connectedNodes()
149	require.Empty(nodes)
150
151	// Check for the node in the FSM
152	state := leader.State()
153	testutil.WaitForResult(func() (bool, error) {
154		out, err := state.NodeByID(nil, node.ID)
155		if err != nil {
156			return false, err
157		}
158		if out == nil {
159			return false, fmt.Errorf("expected node")
160		}
161		if out.CreateIndex != resp.Index {
162			return false, fmt.Errorf("index mis-match")
163		}
164		if out.ComputedClass == "" {
165			return false, fmt.Errorf("ComputedClass not set")
166		}
167
168		return true, nil
169	}, func(err error) {
170		t.Fatalf("err: %v", err)
171	})
172
173	// Close the connection and check that we remove the client connections
174	require.Nil(codec.Close())
175	testutil.WaitForResult(func() (bool, error) {
176		nodes := nonLeader.connectedNodes()
177		return len(nodes) == 0, nil
178	}, func(err error) {
179		t.Fatalf("should have no clients")
180	})
181}
182
183func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
184	t.Parallel()
185
186	s1, cleanupS1 := TestServer(t, nil)
187	defer cleanupS1()
188	codec := rpcClient(t, s1)
189	testutil.WaitForLeader(t, s1.RPC)
190
191	// Create the register request
192	node := mock.Node()
193	req := &structs.NodeRegisterRequest{
194		Node:         node,
195		WriteRequest: structs.WriteRequest{Region: "global"},
196	}
197
198	// Fetch the response
199	var resp structs.GenericResponse
200	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil {
201		t.Fatalf("err: %v", err)
202	}
203
204	// Update the nodes SecretID
205	node.SecretID = uuid.Generate()
206	err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
207	if err == nil || !strings.Contains(err.Error(), "Not registering") {
208		t.Fatalf("Expecting error regarding mismatching secret id: %v", err)
209	}
210}
211
212// Test the deprecated single node deregistration path
213func TestClientEndpoint_DeregisterOne(t *testing.T) {
214	t.Parallel()
215
216	s1, cleanupS1 := TestServer(t, nil)
217	defer cleanupS1()
218	codec := rpcClient(t, s1)
219	testutil.WaitForLeader(t, s1.RPC)
220
221	// Create the register request
222	node := mock.Node()
223	reg := &structs.NodeRegisterRequest{
224		Node:         node,
225		WriteRequest: structs.WriteRequest{Region: "global"},
226	}
227
228	// Fetch the response
229	var resp structs.GenericResponse
230	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
231		t.Fatalf("err: %v", err)
232	}
233
234	// Deregister
235	dereg := &structs.NodeDeregisterRequest{
236		NodeID:       node.ID,
237		WriteRequest: structs.WriteRequest{Region: "global"},
238	}
239	var resp2 structs.GenericResponse
240	if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil {
241		t.Fatalf("err: %v", err)
242	}
243	if resp2.Index == 0 {
244		t.Fatalf("bad index: %d", resp2.Index)
245	}
246
247	// Check for the node in the FSM
248	state := s1.fsm.State()
249	ws := memdb.NewWatchSet()
250	out, err := state.NodeByID(ws, node.ID)
251	if err != nil {
252		t.Fatalf("err: %v", err)
253	}
254	if out != nil {
255		t.Fatalf("unexpected node")
256	}
257}
258
259func TestClientEndpoint_Deregister_ACL(t *testing.T) {
260	t.Parallel()
261
262	s1, root, cleanupS1 := TestACLServer(t, nil)
263	defer cleanupS1()
264	codec := rpcClient(t, s1)
265	testutil.WaitForLeader(t, s1.RPC)
266
267	// Create the node
268	node := mock.Node()
269	node1 := mock.Node()
270	state := s1.fsm.State()
271	if err := state.UpsertNode(structs.MsgTypeTestSetup, 1, node); err != nil {
272		t.Fatalf("err: %v", err)
273	}
274	if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node1); err != nil {
275		t.Fatalf("err: %v", err)
276	}
277
278	// Create the policy and tokens
279	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
280	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
281
282	// Deregister without any token and expect it to fail
283	dereg := &structs.NodeBatchDeregisterRequest{
284		NodeIDs:      []string{node.ID},
285		WriteRequest: structs.WriteRequest{Region: "global"},
286	}
287	var resp structs.GenericResponse
288	if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil {
289		t.Fatalf("node de-register succeeded")
290	}
291
292	// Deregister with a valid token
293	dereg.AuthToken = validToken.SecretID
294	if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil {
295		t.Fatalf("err: %v", err)
296	}
297
298	// Check for the node in the FSM
299	ws := memdb.NewWatchSet()
300	out, err := state.NodeByID(ws, node.ID)
301	if err != nil {
302		t.Fatalf("err: %v", err)
303	}
304	if out != nil {
305		t.Fatalf("unexpected node")
306	}
307
308	// Deregister with an invalid token.
309	dereg1 := &structs.NodeBatchDeregisterRequest{
310		NodeIDs:      []string{node1.ID},
311		WriteRequest: structs.WriteRequest{Region: "global"},
312	}
313	dereg1.AuthToken = invalidToken.SecretID
314	if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil {
315		t.Fatalf("rpc should not have succeeded")
316	}
317
318	// Try with a root token
319	dereg1.AuthToken = root.SecretID
320	if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil {
321		t.Fatalf("err: %v", err)
322	}
323}
324
325func TestClientEndpoint_Deregister_Vault(t *testing.T) {
326	t.Parallel()
327
328	s1, cleanupS1 := TestServer(t, nil)
329	defer cleanupS1()
330	codec := rpcClient(t, s1)
331	testutil.WaitForLeader(t, s1.RPC)
332
333	// Create the register request
334	node := mock.Node()
335	reg := &structs.NodeRegisterRequest{
336		Node:         node,
337		WriteRequest: structs.WriteRequest{Region: "global"},
338	}
339
340	// Fetch the response
341	var resp structs.GenericResponse
342	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
343		t.Fatalf("err: %v", err)
344	}
345
346	// Swap the servers Vault Client
347	tvc := &TestVaultClient{}
348	s1.vault = tvc
349
350	// Put some Vault accessors in the state store for that node
351	state := s1.fsm.State()
352	va1 := mock.VaultAccessor()
353	va1.NodeID = node.ID
354	va2 := mock.VaultAccessor()
355	va2.NodeID = node.ID
356	state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2})
357
358	// Deregister
359	dereg := &structs.NodeBatchDeregisterRequest{
360		NodeIDs:      []string{node.ID},
361		WriteRequest: structs.WriteRequest{Region: "global"},
362	}
363	var resp2 structs.GenericResponse
364	if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil {
365		t.Fatalf("err: %v", err)
366	}
367	if resp2.Index == 0 {
368		t.Fatalf("bad index: %d", resp2.Index)
369	}
370
371	// Check for the node in the FSM
372	ws := memdb.NewWatchSet()
373	out, err := state.NodeByID(ws, node.ID)
374	if err != nil {
375		t.Fatalf("err: %v", err)
376	}
377	if out != nil {
378		t.Fatalf("unexpected node")
379	}
380
381	// Check that the endpoint revoked the tokens
382	if l := len(tvc.RevokedTokens); l != 2 {
383		t.Fatalf("Deregister revoked %d tokens; want 2", l)
384	}
385}
386
387func TestClientEndpoint_UpdateStatus(t *testing.T) {
388	t.Parallel()
389	require := require.New(t)
390
391	s1, cleanupS1 := TestServer(t, nil)
392	defer cleanupS1()
393	codec := rpcClient(t, s1)
394	testutil.WaitForLeader(t, s1.RPC)
395
396	// Check that we have no client connections
397	require.Empty(s1.connectedNodes())
398
399	// Create the register request
400	node := mock.Node()
401	reg := &structs.NodeRegisterRequest{
402		Node:         node,
403		WriteRequest: structs.WriteRequest{Region: "global"},
404	}
405
406	// Fetch the response
407	var resp structs.NodeUpdateResponse
408	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
409		t.Fatalf("err: %v", err)
410	}
411
412	// Check for heartbeat interval
413	ttl := resp.HeartbeatTTL
414	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
415		t.Fatalf("bad: %#v", ttl)
416	}
417
418	// Update the status
419	dereg := &structs.NodeUpdateStatusRequest{
420		NodeID:       node.ID,
421		Status:       structs.NodeStatusInit,
422		WriteRequest: structs.WriteRequest{Region: "global"},
423	}
424	var resp2 structs.NodeUpdateResponse
425	if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil {
426		t.Fatalf("err: %v", err)
427	}
428	if resp2.Index == 0 {
429		t.Fatalf("bad index: %d", resp2.Index)
430	}
431
432	// Check for heartbeat interval
433	ttl = resp2.HeartbeatTTL
434	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
435		t.Fatalf("bad: %#v", ttl)
436	}
437
438	// Check that we have the client connections
439	nodes := s1.connectedNodes()
440	require.Len(nodes, 1)
441	require.Contains(nodes, node.ID)
442
443	// Check for the node in the FSM
444	state := s1.fsm.State()
445	ws := memdb.NewWatchSet()
446	out, err := state.NodeByID(ws, node.ID)
447	if err != nil {
448		t.Fatalf("err: %v", err)
449	}
450	if out == nil {
451		t.Fatalf("expected node")
452	}
453	if out.ModifyIndex != resp2.Index {
454		t.Fatalf("index mis-match")
455	}
456
457	// Close the connection and check that we remove the client connections
458	require.Nil(codec.Close())
459	testutil.WaitForResult(func() (bool, error) {
460		nodes := s1.connectedNodes()
461		return len(nodes) == 0, nil
462	}, func(err error) {
463		t.Fatalf("should have no clients")
464	})
465}
466
467func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
468	t.Parallel()
469
470	s1, cleanupS1 := TestServer(t, nil)
471	defer cleanupS1()
472	codec := rpcClient(t, s1)
473	testutil.WaitForLeader(t, s1.RPC)
474
475	// Create the register request
476	node := mock.Node()
477	reg := &structs.NodeRegisterRequest{
478		Node:         node,
479		WriteRequest: structs.WriteRequest{Region: "global"},
480	}
481
482	// Fetch the response
483	var resp structs.NodeUpdateResponse
484	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
485		t.Fatalf("err: %v", err)
486	}
487
488	// Check for heartbeat interval
489	ttl := resp.HeartbeatTTL
490	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
491		t.Fatalf("bad: %#v", ttl)
492	}
493
494	// Swap the servers Vault Client
495	tvc := &TestVaultClient{}
496	s1.vault = tvc
497
498	// Put some Vault accessors in the state store for that node
499	state := s1.fsm.State()
500	va1 := mock.VaultAccessor()
501	va1.NodeID = node.ID
502	va2 := mock.VaultAccessor()
503	va2.NodeID = node.ID
504	state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2})
505
506	// Update the status to be down
507	dereg := &structs.NodeUpdateStatusRequest{
508		NodeID:       node.ID,
509		Status:       structs.NodeStatusDown,
510		WriteRequest: structs.WriteRequest{Region: "global"},
511	}
512	var resp2 structs.NodeUpdateResponse
513	if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil {
514		t.Fatalf("err: %v", err)
515	}
516	if resp2.Index == 0 {
517		t.Fatalf("bad index: %d", resp2.Index)
518	}
519
520	// Check that the endpoint revoked the tokens
521	if l := len(tvc.RevokedTokens); l != 2 {
522		t.Fatalf("Deregister revoked %d tokens; want 2", l)
523	}
524}
525
526func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
527	t.Parallel()
528	require := require.New(t)
529
530	s1, cleanupS1 := TestServer(t, nil)
531	defer cleanupS1()
532	codec := rpcClient(t, s1)
533	testutil.WaitForLeader(t, s1.RPC)
534
535	// Check that we have no client connections
536	require.Empty(s1.connectedNodes())
537
538	// Create the register request but make the node down
539	node := mock.Node()
540	node.Status = structs.NodeStatusDown
541	reg := &structs.NodeRegisterRequest{
542		Node:         node,
543		WriteRequest: structs.WriteRequest{Region: "global"},
544	}
545
546	// Fetch the response
547	var resp structs.NodeUpdateResponse
548	require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
549
550	// Update the status
551	dereg := &structs.NodeUpdateStatusRequest{
552		NodeID:       node.ID,
553		Status:       structs.NodeStatusInit,
554		WriteRequest: structs.WriteRequest{Region: "global"},
555	}
556	var resp2 structs.NodeUpdateResponse
557	require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2))
558	require.NotZero(resp2.Index)
559
560	// Check for heartbeat interval
561	ttl := resp2.HeartbeatTTL
562	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
563		t.Fatalf("bad: %#v", ttl)
564	}
565
566	// Check for the node in the FSM
567	state := s1.fsm.State()
568	ws := memdb.NewWatchSet()
569	out, err := state.NodeByID(ws, node.ID)
570	require.NoError(err)
571	require.NotNil(out)
572	require.EqualValues(resp2.Index, out.ModifyIndex)
573	require.Len(out.Events, 2)
574	require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message)
575}
576
577func TestClientEndpoint_Register_GetEvals(t *testing.T) {
578	t.Parallel()
579
580	s1, cleanupS1 := TestServer(t, nil)
581	defer cleanupS1()
582	codec := rpcClient(t, s1)
583	testutil.WaitForLeader(t, s1.RPC)
584
585	// Register a system job.
586	job := mock.SystemJob()
587	state := s1.fsm.State()
588	if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil {
589		t.Fatalf("err: %v", err)
590	}
591
592	// Create the register request going directly to ready
593	node := mock.Node()
594	node.Status = structs.NodeStatusReady
595	reg := &structs.NodeRegisterRequest{
596		Node:         node,
597		WriteRequest: structs.WriteRequest{Region: "global"},
598	}
599
600	// Fetch the response
601	var resp structs.NodeUpdateResponse
602	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
603		t.Fatalf("err: %v", err)
604	}
605
606	// Check for heartbeat interval
607	ttl := resp.HeartbeatTTL
608	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
609		t.Fatalf("bad: %#v", ttl)
610	}
611
612	// Check for an eval caused by the system job.
613	if len(resp.EvalIDs) != 1 {
614		t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
615	}
616
617	evalID := resp.EvalIDs[0]
618	ws := memdb.NewWatchSet()
619	eval, err := state.EvalByID(ws, evalID)
620	if err != nil {
621		t.Fatalf("could not get eval %v", evalID)
622	}
623
624	if eval.Type != "system" {
625		t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
626	}
627
628	// Check for the node in the FSM
629	out, err := state.NodeByID(ws, node.ID)
630	if err != nil {
631		t.Fatalf("err: %v", err)
632	}
633	if out == nil {
634		t.Fatalf("expected node")
635	}
636	if out.ModifyIndex != resp.Index {
637		t.Fatalf("index mis-match")
638	}
639
640	// Transition it to down and then ready
641	node.Status = structs.NodeStatusDown
642	reg = &structs.NodeRegisterRequest{
643		Node:         node,
644		WriteRequest: structs.WriteRequest{Region: "global"},
645	}
646
647	// Fetch the response
648	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
649		t.Fatalf("err: %v", err)
650	}
651
652	if len(resp.EvalIDs) != 1 {
653		t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
654	}
655
656	node.Status = structs.NodeStatusReady
657	reg = &structs.NodeRegisterRequest{
658		Node:         node,
659		WriteRequest: structs.WriteRequest{Region: "global"},
660	}
661
662	// Fetch the response
663	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
664		t.Fatalf("err: %v", err)
665	}
666
667	if len(resp.EvalIDs) != 1 {
668		t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
669	}
670}
671
672func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {
673	t.Parallel()
674
675	s1, cleanupS1 := TestServer(t, nil)
676	defer cleanupS1()
677	codec := rpcClient(t, s1)
678	testutil.WaitForLeader(t, s1.RPC)
679
680	// Register a system job.
681	job := mock.SystemJob()
682	state := s1.fsm.State()
683	if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil {
684		t.Fatalf("err: %v", err)
685	}
686
687	// Create the register request
688	node := mock.Node()
689	node.Status = structs.NodeStatusInit
690	reg := &structs.NodeRegisterRequest{
691		Node:         node,
692		WriteRequest: structs.WriteRequest{Region: "global"},
693	}
694
695	// Fetch the response
696	var resp structs.NodeUpdateResponse
697	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
698		t.Fatalf("err: %v", err)
699	}
700
701	// Check for heartbeat interval
702	ttl := resp.HeartbeatTTL
703	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
704		t.Fatalf("bad: %#v", ttl)
705	}
706
707	// Update the status
708	update := &structs.NodeUpdateStatusRequest{
709		NodeID:       node.ID,
710		Status:       structs.NodeStatusReady,
711		WriteRequest: structs.WriteRequest{Region: "global"},
712	}
713	var resp2 structs.NodeUpdateResponse
714	if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", update, &resp2); err != nil {
715		t.Fatalf("err: %v", err)
716	}
717	if resp2.Index == 0 {
718		t.Fatalf("bad index: %d", resp2.Index)
719	}
720
721	// Check for an eval caused by the system job.
722	if len(resp2.EvalIDs) != 1 {
723		t.Fatalf("expected one eval; got %#v", resp2.EvalIDs)
724	}
725
726	evalID := resp2.EvalIDs[0]
727	ws := memdb.NewWatchSet()
728	eval, err := state.EvalByID(ws, evalID)
729	if err != nil {
730		t.Fatalf("could not get eval %v", evalID)
731	}
732
733	if eval.Type != "system" {
734		t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system")
735	}
736
737	// Check for heartbeat interval
738	ttl = resp2.HeartbeatTTL
739	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
740		t.Fatalf("bad: %#v", ttl)
741	}
742
743	// Check for the node in the FSM
744	out, err := state.NodeByID(ws, node.ID)
745	if err != nil {
746		t.Fatalf("err: %v", err)
747	}
748	if out == nil {
749		t.Fatalf("expected node")
750	}
751	if out.ModifyIndex != resp2.Index {
752		t.Fatalf("index mis-match")
753	}
754}
755
756func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) {
757	t.Parallel()
758
759	s1, cleanupS1 := TestServer(t, func(c *Config) {
760		c.BootstrapExpect = 3
761	})
762	defer cleanupS1()
763
764	s2, cleanupS2 := TestServer(t, func(c *Config) {
765		c.BootstrapExpect = 3
766	})
767	defer cleanupS2()
768
769	s3, cleanupS3 := TestServer(t, func(c *Config) {
770		c.BootstrapExpect = 3
771	})
772	defer cleanupS3()
773	servers := []*Server{s1, s2, s3}
774	TestJoin(t, s1, s2, s3)
775
776	for _, s := range servers {
777		testutil.WaitForResult(func() (bool, error) {
778			peers, _ := s.numPeers()
779			return peers == 3, nil
780		}, func(err error) {
781			t.Fatalf("should have 3 peers")
782		})
783	}
784
785	codec := rpcClient(t, s1)
786	testutil.WaitForLeader(t, s1.RPC)
787
788	// Create the register request
789	node := mock.Node()
790	reg := &structs.NodeRegisterRequest{
791		Node:         node,
792		WriteRequest: structs.WriteRequest{Region: "global"},
793	}
794
795	// Fetch the response
796	var resp structs.NodeUpdateResponse
797	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
798		t.Fatalf("err: %v", err)
799	}
800
801	// Check for heartbeat interval
802	ttl := resp.HeartbeatTTL
803	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
804		t.Fatalf("bad: %#v", ttl)
805	}
806
807	// Check for heartbeat servers
808	serverAddrs := resp.Servers
809	if len(serverAddrs) == 0 {
810		t.Fatalf("bad: %#v", serverAddrs)
811	}
812
813	// Update the status, static state
814	dereg := &structs.NodeUpdateStatusRequest{
815		NodeID:       node.ID,
816		Status:       node.Status,
817		WriteRequest: structs.WriteRequest{Region: "global"},
818	}
819	var resp2 structs.NodeUpdateResponse
820	if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil {
821		t.Fatalf("err: %v", err)
822	}
823	if resp2.Index != 0 {
824		t.Fatalf("bad index: %d", resp2.Index)
825	}
826
827	// Check for heartbeat interval
828	ttl = resp2.HeartbeatTTL
829	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
830		t.Fatalf("bad: %#v", ttl)
831	}
832}
833
834func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) {
835	t.Parallel()
836	require := require.New(t)
837
838	advAddr := "127.0.1.1:1234"
839	adv, err := net.ResolveTCPAddr("tcp", advAddr)
840	require.Nil(err)
841
842	s1, cleanupS1 := TestServer(t, func(c *Config) {
843		c.ClientRPCAdvertise = adv
844	})
845	defer cleanupS1()
846	codec := rpcClient(t, s1)
847	testutil.WaitForLeader(t, s1.RPC)
848
849	// Create the register request
850	node := mock.Node()
851	reg := &structs.NodeRegisterRequest{
852		Node:         node,
853		WriteRequest: structs.WriteRequest{Region: "global"},
854	}
855
856	// Fetch the response
857	var resp structs.NodeUpdateResponse
858	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
859		t.Fatalf("err: %v", err)
860	}
861
862	// Check for heartbeat interval
863	ttl := resp.HeartbeatTTL
864	if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
865		t.Fatalf("bad: %#v", ttl)
866	}
867
868	// Check for heartbeat servers
869	require.Len(resp.Servers, 1)
870	require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr)
871}
872
873// TestClientEndpoint_UpdateDrain asserts the ability to initiate drain
874// against a node and cancel that drain. It also asserts:
875// * an evaluation is created when the node becomes eligible
876// * drain metadata is properly persisted in Node.LastDrain
877func TestClientEndpoint_UpdateDrain(t *testing.T) {
878	t.Parallel()
879	require := require.New(t)
880
881	s1, cleanupS1 := TestServer(t, nil)
882	defer cleanupS1()
883	codec := rpcClient(t, s1)
884	testutil.WaitForLeader(t, s1.RPC)
885
886	// Disable drainer to prevent drain from completing during test
887	s1.nodeDrainer.SetEnabled(false, nil)
888
889	// Create the register request
890	node := mock.Node()
891	reg := &structs.NodeRegisterRequest{
892		Node:         node,
893		WriteRequest: structs.WriteRequest{Region: "global"},
894	}
895
896	// Fetch the response
897	var resp structs.NodeUpdateResponse
898	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
899
900	beforeUpdate := time.Now()
901	strategy := &structs.DrainStrategy{
902		DrainSpec: structs.DrainSpec{
903			Deadline: 10 * time.Second,
904		},
905	}
906
907	// Update the status
908	dereg := &structs.NodeUpdateDrainRequest{
909		NodeID:        node.ID,
910		DrainStrategy: strategy,
911		Meta:          map[string]string{"message": "this node is not needed"},
912		WriteRequest:  structs.WriteRequest{Region: "global"},
913	}
914	var resp2 structs.NodeDrainUpdateResponse
915	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
916	require.NotZero(resp2.Index)
917
918	// Check for the node in the FSM
919	state := s1.fsm.State()
920	ws := memdb.NewWatchSet()
921	out, err := state.NodeByID(ws, node.ID)
922	require.Nil(err)
923	require.NotNil(out.DrainStrategy)
924	require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
925	require.Len(out.Events, 2)
926	require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)
927	require.NotNil(out.LastDrain)
928	require.Equal(structs.DrainMetadata{
929		StartedAt: out.LastDrain.UpdatedAt,
930		UpdatedAt: out.LastDrain.StartedAt,
931		Status:    structs.DrainStatusDraining,
932		Meta:      map[string]string{"message": "this node is not needed"},
933	}, *out.LastDrain)
934
935	// before+deadline should be before the forced deadline
936	require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline))
937
938	// now+deadline should be after the forced deadline
939	require.True(time.Now().Add(strategy.Deadline).After(out.DrainStrategy.ForceDeadline))
940
941	drainStartedAt := out.DrainStrategy.StartedAt
942	// StartedAt should be close to the time the drain started
943	require.WithinDuration(beforeUpdate, drainStartedAt, 1*time.Second)
944
945	// StartedAt shouldn't change if a new request comes while still draining
946	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
947	ws = memdb.NewWatchSet()
948	out, err = state.NodeByID(ws, node.ID)
949	require.NoError(err)
950	require.True(out.DrainStrategy.StartedAt.Equal(drainStartedAt))
951
952	// Register a system job
953	job := mock.SystemJob()
954	require.Nil(s1.State().UpsertJob(structs.MsgTypeTestSetup, 10, job))
955
956	// Update the eligibility and expect evals
957	dereg.DrainStrategy = nil
958	dereg.MarkEligible = true
959	dereg.Meta = map[string]string{"cancelled": "yes"}
960	var resp3 structs.NodeDrainUpdateResponse
961	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3))
962	require.NotZero(resp3.Index)
963	require.NotZero(resp3.EvalCreateIndex)
964	require.Len(resp3.EvalIDs, 1)
965
966	// Check for updated node in the FSM
967	ws = memdb.NewWatchSet()
968	out, err = state.NodeByID(ws, node.ID)
969	require.NoError(err)
970	require.Len(out.Events, 4)
971	require.Equal(NodeDrainEventDrainDisabled, out.Events[3].Message)
972	require.NotNil(out.LastDrain)
973	require.NotNil(out.LastDrain)
974	require.False(out.LastDrain.UpdatedAt.Before(out.LastDrain.StartedAt))
975	require.Equal(structs.DrainMetadata{
976		StartedAt: out.LastDrain.StartedAt,
977		UpdatedAt: out.LastDrain.UpdatedAt,
978		Status:    structs.DrainStatusCanceled,
979		Meta:      map[string]string{"cancelled": "yes"},
980	}, *out.LastDrain)
981
982	// Check that calling UpdateDrain with the same DrainStrategy does not emit
983	// a node event.
984	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3))
985	ws = memdb.NewWatchSet()
986	out, err = state.NodeByID(ws, node.ID)
987	require.NoError(err)
988	require.Len(out.Events, 4)
989}
990
991// TestClientEndpoint_UpdatedDrainAndCompleted asserts that drain metadata
992// is properly persisted in Node.LastDrain as the node drain is updated and
993// completes.
994func TestClientEndpoint_UpdatedDrainAndCompleted(t *testing.T) {
995	t.Parallel()
996	require := require.New(t)
997
998	s1, cleanupS1 := TestServer(t, nil)
999	defer cleanupS1()
1000	codec := rpcClient(t, s1)
1001	testutil.WaitForLeader(t, s1.RPC)
1002	state := s1.fsm.State()
1003
1004	// Disable drainer for now
1005	s1.nodeDrainer.SetEnabled(false, nil)
1006
1007	// Create the register request
1008	node := mock.Node()
1009	reg := &structs.NodeRegisterRequest{
1010		Node:         node,
1011		WriteRequest: structs.WriteRequest{Region: "global"},
1012	}
1013
1014	// Fetch the response
1015	var resp structs.NodeUpdateResponse
1016	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1017
1018	strategy := &structs.DrainStrategy{
1019		DrainSpec: structs.DrainSpec{
1020			Deadline: 10 * time.Second,
1021		},
1022	}
1023
1024	// Update the status
1025	dereg := &structs.NodeUpdateDrainRequest{
1026		NodeID:        node.ID,
1027		DrainStrategy: strategy,
1028		Meta: map[string]string{
1029			"message": "first drain",
1030		},
1031		WriteRequest: structs.WriteRequest{Region: "global"},
1032	}
1033	var resp2 structs.NodeDrainUpdateResponse
1034	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
1035	require.NotZero(resp2.Index)
1036
1037	// Check for the node in the FSM
1038	out, err := state.NodeByID(nil, node.ID)
1039	require.Nil(err)
1040	require.NotNil(out.DrainStrategy)
1041	require.NotNil(out.LastDrain)
1042	firstDrainUpdate := out.LastDrain.UpdatedAt
1043	require.Equal(structs.DrainMetadata{
1044		StartedAt: firstDrainUpdate,
1045		UpdatedAt: firstDrainUpdate,
1046		Status:    structs.DrainStatusDraining,
1047		Meta:      map[string]string{"message": "first drain"},
1048	}, *out.LastDrain)
1049
1050	time.Sleep(1 * time.Second)
1051
1052	// Update the drain
1053	dereg.DrainStrategy.DrainSpec.Deadline *= 2
1054	dereg.Meta["message"] = "second drain"
1055	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
1056	require.NotZero(resp2.Index)
1057
1058	out, err = state.NodeByID(nil, node.ID)
1059	require.Nil(err)
1060	require.NotNil(out.DrainStrategy)
1061	require.NotNil(out.LastDrain)
1062	secondDrainUpdate := out.LastDrain.UpdatedAt
1063	require.True(secondDrainUpdate.After(firstDrainUpdate))
1064	require.Equal(structs.DrainMetadata{
1065		StartedAt: firstDrainUpdate,
1066		UpdatedAt: secondDrainUpdate,
1067		Status:    structs.DrainStatusDraining,
1068		Meta:      map[string]string{"message": "second drain"},
1069	}, *out.LastDrain)
1070
1071	time.Sleep(1 * time.Second)
1072
1073	// Enable the drainer, wait for completion
1074	s1.nodeDrainer.SetEnabled(true, state)
1075
1076	testutil.WaitForResult(func() (bool, error) {
1077		out, err = state.NodeByID(nil, node.ID)
1078		if err != nil {
1079			return false, err
1080		}
1081		if out == nil {
1082			return false, fmt.Errorf("could not find node")
1083		}
1084		return out.DrainStrategy == nil, nil
1085	}, func(err error) {
1086		t.Fatalf("err: %v", err)
1087	})
1088
1089	require.True(out.LastDrain.UpdatedAt.After(secondDrainUpdate))
1090	require.Equal(structs.DrainMetadata{
1091		StartedAt: firstDrainUpdate,
1092		UpdatedAt: out.LastDrain.UpdatedAt,
1093		Status:    structs.DrainStatusComplete,
1094		Meta:      map[string]string{"message": "second drain"},
1095	}, *out.LastDrain)
1096}
1097
1098// TestClientEndpoint_UpdatedDrainNoop asserts that drain metadata is properly
1099// persisted in Node.LastDrain when calls to Node.UpdateDrain() don't affect
1100// the drain status.
1101func TestClientEndpoint_UpdatedDrainNoop(t *testing.T) {
1102	t.Parallel()
1103	require := require.New(t)
1104
1105	s1, cleanupS1 := TestServer(t, nil)
1106	defer cleanupS1()
1107	codec := rpcClient(t, s1)
1108	testutil.WaitForLeader(t, s1.RPC)
1109	state := s1.fsm.State()
1110
1111	// Create the register request
1112	node := mock.Node()
1113	reg := &structs.NodeRegisterRequest{
1114		Node:         node,
1115		WriteRequest: structs.WriteRequest{Region: "global"},
1116	}
1117
1118	// Fetch the response
1119	var resp structs.NodeUpdateResponse
1120	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1121
1122	// Update the status
1123	dereg := &structs.NodeUpdateDrainRequest{
1124		NodeID: node.ID,
1125		DrainStrategy: &structs.DrainStrategy{
1126			DrainSpec: structs.DrainSpec{
1127				Deadline: 10 * time.Second,
1128			},
1129		},
1130		Meta: map[string]string{
1131			"message": "drain",
1132		},
1133		WriteRequest: structs.WriteRequest{Region: "global"},
1134	}
1135	var drainResp structs.NodeDrainUpdateResponse
1136	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &drainResp))
1137	require.NotZero(drainResp.Index)
1138
1139	var out *structs.Node
1140	testutil.WaitForResult(func() (bool, error) {
1141		var err error
1142		out, err = state.NodeByID(nil, node.ID)
1143		if err != nil {
1144			return false, err
1145		}
1146		if out == nil {
1147			return false, fmt.Errorf("could not find node")
1148		}
1149		return out.DrainStrategy == nil && out.SchedulingEligibility == structs.NodeSchedulingIneligible, nil
1150	}, func(err error) {
1151		t.Fatalf("err: %v", err)
1152	})
1153
1154	require.Equal(structs.DrainStatusComplete, out.LastDrain.Status)
1155	require.Equal(map[string]string{"message": "drain"}, out.LastDrain.Meta)
1156	prevDrain := out.LastDrain
1157
1158	// call again with Drain Strategy nil; should be a no-op because drain is already complete
1159	dereg.DrainStrategy = nil
1160	dereg.Meta = map[string]string{
1161		"new_message": "is new",
1162	}
1163	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &drainResp))
1164	require.NotZero(drainResp.Index)
1165
1166	out, err := state.NodeByID(nil, node.ID)
1167	require.Nil(err)
1168	require.Nil(out.DrainStrategy)
1169	require.NotNil(out.LastDrain)
1170	require.Equal(prevDrain, out.LastDrain)
1171}
1172
1173// TestClientEndpoint_UpdateDrain_ACL asserts that Node.UpdateDrain() enforces
1174// node.write ACLs, and that token accessor ID is properly persisted in
1175// Node.LastDrain.AccessorID
1176func TestClientEndpoint_UpdateDrain_ACL(t *testing.T) {
1177	t.Parallel()
1178
1179	s1, root, cleanupS1 := TestACLServer(t, nil)
1180	defer cleanupS1()
1181	codec := rpcClient(t, s1)
1182	testutil.WaitForLeader(t, s1.RPC)
1183	require := require.New(t)
1184
1185	// Create the node
1186	node := mock.Node()
1187	state := s1.fsm.State()
1188
1189	require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
1190
1191	// Create the policy and tokens
1192	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
1193	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
1194
1195	// Update the status without a token and expect failure
1196	dereg := &structs.NodeUpdateDrainRequest{
1197		NodeID: node.ID,
1198		DrainStrategy: &structs.DrainStrategy{
1199			DrainSpec: structs.DrainSpec{
1200				Deadline: 10 * time.Second,
1201			},
1202		},
1203		WriteRequest: structs.WriteRequest{Region: "global"},
1204	}
1205	{
1206		var resp structs.NodeDrainUpdateResponse
1207		err := msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp)
1208		require.NotNil(err, "RPC")
1209		require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1210	}
1211
1212	// Try with a valid token
1213	dereg.AuthToken = validToken.SecretID
1214	{
1215		var resp structs.NodeDrainUpdateResponse
1216		require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp), "RPC")
1217		out, err := state.NodeByID(nil, node.ID)
1218		require.NoError(err)
1219		require.Equal(validToken.AccessorID, out.LastDrain.AccessorID)
1220	}
1221
1222	// Try with a invalid token
1223	dereg.AuthToken = invalidToken.SecretID
1224	{
1225		var resp structs.NodeDrainUpdateResponse
1226		err := msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp)
1227		require.NotNil(err, "RPC")
1228		require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1229	}
1230
1231	// Try with a root token
1232	dereg.DrainStrategy.DrainSpec.Deadline = 20 * time.Second
1233	dereg.AuthToken = root.SecretID
1234	{
1235		var resp structs.NodeDrainUpdateResponse
1236		require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp), "RPC")
1237		out, err := state.NodeByID(nil, node.ID)
1238		require.NoError(err)
1239		require.Equal(root.AccessorID, out.LastDrain.AccessorID)
1240	}
1241}
1242
1243// This test ensures that Nomad marks client state of allocations which are in
1244// pending/running state to lost when a node is marked as down.
1245func TestClientEndpoint_Drain_Down(t *testing.T) {
1246	t.Parallel()
1247
1248	s1, cleanupS1 := TestServer(t, nil)
1249	defer cleanupS1()
1250	codec := rpcClient(t, s1)
1251	testutil.WaitForLeader(t, s1.RPC)
1252	require := require.New(t)
1253
1254	// Register a node
1255	node := mock.Node()
1256	reg := &structs.NodeRegisterRequest{
1257		Node:         node,
1258		WriteRequest: structs.WriteRequest{Region: "global"},
1259	}
1260	// Fetch the response
1261	var resp structs.NodeUpdateResponse
1262	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1263
1264	// Register a service job
1265	var jobResp structs.JobRegisterResponse
1266	job := mock.Job()
1267	job.TaskGroups[0].Count = 1
1268	jobReq := &structs.JobRegisterRequest{
1269		Job: job,
1270		WriteRequest: structs.WriteRequest{
1271			Region:    "global",
1272			Namespace: job.Namespace,
1273		},
1274	}
1275	require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp))
1276
1277	// Register a system job
1278	var jobResp1 structs.JobRegisterResponse
1279	job1 := mock.SystemJob()
1280	job1.TaskGroups[0].Count = 1
1281	jobReq1 := &structs.JobRegisterRequest{
1282		Job: job1,
1283		WriteRequest: structs.WriteRequest{
1284			Region:    "global",
1285			Namespace: job1.Namespace,
1286		},
1287	}
1288	require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq1, &jobResp1))
1289
1290	// Wait for the scheduler to create an allocation
1291	testutil.WaitForResult(func() (bool, error) {
1292		ws := memdb.NewWatchSet()
1293		allocs, err := s1.fsm.state.AllocsByJob(ws, job.Namespace, job.ID, true)
1294		if err != nil {
1295			return false, err
1296		}
1297		allocs1, err := s1.fsm.state.AllocsByJob(ws, job1.Namespace, job1.ID, true)
1298		if err != nil {
1299			return false, err
1300		}
1301		return len(allocs) > 0 && len(allocs1) > 0, nil
1302	}, func(err error) {
1303		t.Fatalf("err: %v", err)
1304	})
1305
1306	// Drain the node
1307	dereg := &structs.NodeUpdateDrainRequest{
1308		NodeID: node.ID,
1309		DrainStrategy: &structs.DrainStrategy{
1310			DrainSpec: structs.DrainSpec{
1311				Deadline: -1 * time.Second,
1312			},
1313		},
1314		WriteRequest: structs.WriteRequest{Region: "global"},
1315	}
1316	var resp2 structs.NodeDrainUpdateResponse
1317	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))
1318
1319	// Mark the node as down
1320	node.Status = structs.NodeStatusDown
1321	reg = &structs.NodeRegisterRequest{
1322		Node:         node,
1323		WriteRequest: structs.WriteRequest{Region: "global"},
1324	}
1325	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1326
1327	// Ensure that the allocation has transitioned to lost
1328	testutil.WaitForResult(func() (bool, error) {
1329		ws := memdb.NewWatchSet()
1330		summary, err := s1.fsm.state.JobSummaryByID(ws, job.Namespace, job.ID)
1331		if err != nil {
1332			return false, err
1333		}
1334		expectedSummary := &structs.JobSummary{
1335			JobID:     job.ID,
1336			Namespace: job.Namespace,
1337			Summary: map[string]structs.TaskGroupSummary{
1338				"web": {
1339					Queued: 1,
1340					Lost:   1,
1341				},
1342			},
1343			Children:    new(structs.JobChildrenSummary),
1344			CreateIndex: jobResp.JobModifyIndex,
1345			ModifyIndex: summary.ModifyIndex,
1346		}
1347		if !reflect.DeepEqual(summary, expectedSummary) {
1348			return false, fmt.Errorf("Service: expected: %#v, actual: %#v", expectedSummary, summary)
1349		}
1350
1351		summary1, err := s1.fsm.state.JobSummaryByID(ws, job1.Namespace, job1.ID)
1352		if err != nil {
1353			return false, err
1354		}
1355		expectedSummary1 := &structs.JobSummary{
1356			JobID:     job1.ID,
1357			Namespace: job1.Namespace,
1358			Summary: map[string]structs.TaskGroupSummary{
1359				"web": {
1360					Lost: 1,
1361				},
1362			},
1363			Children:    new(structs.JobChildrenSummary),
1364			CreateIndex: jobResp1.JobModifyIndex,
1365			ModifyIndex: summary1.ModifyIndex,
1366		}
1367		if !reflect.DeepEqual(summary1, expectedSummary1) {
1368			return false, fmt.Errorf("System: expected: %#v, actual: %#v", expectedSummary1, summary1)
1369		}
1370		return true, nil
1371	}, func(err error) {
1372		t.Fatalf("err: %v", err)
1373	})
1374}
1375
1376func TestClientEndpoint_UpdateEligibility(t *testing.T) {
1377	t.Parallel()
1378	require := require.New(t)
1379
1380	s1, cleanupS1 := TestServer(t, nil)
1381	defer cleanupS1()
1382	codec := rpcClient(t, s1)
1383	testutil.WaitForLeader(t, s1.RPC)
1384
1385	// Create the register request
1386	node := mock.Node()
1387	reg := &structs.NodeRegisterRequest{
1388		Node:         node,
1389		WriteRequest: structs.WriteRequest{Region: "global"},
1390	}
1391
1392	// Fetch the response
1393	var resp structs.NodeUpdateResponse
1394	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
1395
1396	// Update the eligibility
1397	elig := &structs.NodeUpdateEligibilityRequest{
1398		NodeID:       node.ID,
1399		Eligibility:  structs.NodeSchedulingIneligible,
1400		WriteRequest: structs.WriteRequest{Region: "global"},
1401	}
1402	var resp2 structs.NodeEligibilityUpdateResponse
1403	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp2))
1404	require.NotZero(resp2.Index)
1405	require.Zero(resp2.EvalCreateIndex)
1406	require.Empty(resp2.EvalIDs)
1407
1408	// Check for the node in the FSM
1409	state := s1.fsm.State()
1410	out, err := state.NodeByID(nil, node.ID)
1411	require.Nil(err)
1412	require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
1413	require.Len(out.Events, 2)
1414	require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message)
1415
1416	// Register a system job
1417	job := mock.SystemJob()
1418	require.Nil(s1.State().UpsertJob(structs.MsgTypeTestSetup, 10, job))
1419
1420	// Update the eligibility and expect evals
1421	elig.Eligibility = structs.NodeSchedulingEligible
1422	var resp3 structs.NodeEligibilityUpdateResponse
1423	require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp3))
1424	require.NotZero(resp3.Index)
1425	require.NotZero(resp3.EvalCreateIndex)
1426	require.Len(resp3.EvalIDs, 1)
1427
1428	out, err = state.NodeByID(nil, node.ID)
1429	require.Nil(err)
1430	require.Len(out.Events, 3)
1431	require.Equal(NodeEligibilityEventEligible, out.Events[2].Message)
1432}
1433
1434func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
1435	t.Parallel()
1436
1437	s1, root, cleanupS1 := TestACLServer(t, nil)
1438	defer cleanupS1()
1439	codec := rpcClient(t, s1)
1440	testutil.WaitForLeader(t, s1.RPC)
1441	require := require.New(t)
1442
1443	// Create the node
1444	node := mock.Node()
1445	state := s1.fsm.State()
1446
1447	require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
1448
1449	// Create the policy and tokens
1450	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
1451	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
1452
1453	// Update the status without a token and expect failure
1454	dereg := &structs.NodeUpdateEligibilityRequest{
1455		NodeID:       node.ID,
1456		Eligibility:  structs.NodeSchedulingIneligible,
1457		WriteRequest: structs.WriteRequest{Region: "global"},
1458	}
1459	{
1460		var resp structs.NodeEligibilityUpdateResponse
1461		err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
1462		require.NotNil(err, "RPC")
1463		require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1464	}
1465
1466	// Try with a valid token
1467	dereg.AuthToken = validToken.SecretID
1468	{
1469		var resp structs.NodeEligibilityUpdateResponse
1470		require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
1471	}
1472
1473	// Try with a invalid token
1474	dereg.AuthToken = invalidToken.SecretID
1475	{
1476		var resp structs.NodeEligibilityUpdateResponse
1477		err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
1478		require.NotNil(err, "RPC")
1479		require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1480	}
1481
1482	// Try with a root token
1483	dereg.AuthToken = root.SecretID
1484	{
1485		var resp structs.NodeEligibilityUpdateResponse
1486		require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
1487	}
1488}
1489
1490func TestClientEndpoint_GetNode(t *testing.T) {
1491	t.Parallel()
1492
1493	s1, cleanupS1 := TestServer(t, nil)
1494	defer cleanupS1()
1495	codec := rpcClient(t, s1)
1496	testutil.WaitForLeader(t, s1.RPC)
1497
1498	// Create the register request
1499	node := mock.Node()
1500	reg := &structs.NodeRegisterRequest{
1501		Node:         node,
1502		WriteRequest: structs.WriteRequest{Region: "global"},
1503	}
1504
1505	// Fetch the response
1506	var resp structs.GenericResponse
1507	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
1508		t.Fatalf("err: %v", err)
1509	}
1510	node.CreateIndex = resp.Index
1511	node.ModifyIndex = resp.Index
1512
1513	// Lookup the node
1514	get := &structs.NodeSpecificRequest{
1515		NodeID:       node.ID,
1516		QueryOptions: structs.QueryOptions{Region: "global"},
1517	}
1518	var resp2 structs.SingleNodeResponse
1519	if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil {
1520		t.Fatalf("err: %v", err)
1521	}
1522	if resp2.Index != resp.Index {
1523		t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index)
1524	}
1525
1526	if resp2.Node.ComputedClass == "" {
1527		t.Fatalf("bad ComputedClass: %#v", resp2.Node)
1528	}
1529
1530	// Update the status updated at value
1531	node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt
1532	node.SecretID = ""
1533	node.Events = resp2.Node.Events
1534	if !reflect.DeepEqual(node, resp2.Node) {
1535		t.Fatalf("bad: %#v \n %#v", node, resp2.Node)
1536	}
1537
1538	// assert that the node register event was set correctly
1539	if len(resp2.Node.Events) != 1 {
1540		t.Fatalf("Did not set node events: %#v", resp2.Node)
1541	}
1542	if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered {
1543		t.Fatalf("Did not set node register event correctly: %#v", resp2.Node)
1544	}
1545
1546	// Lookup non-existing node
1547	get.NodeID = "12345678-abcd-efab-cdef-123456789abc"
1548	if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil {
1549		t.Fatalf("err: %v", err)
1550	}
1551	if resp2.Index != resp.Index {
1552		t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index)
1553	}
1554	if resp2.Node != nil {
1555		t.Fatalf("unexpected node")
1556	}
1557}
1558
1559func TestClientEndpoint_GetNode_ACL(t *testing.T) {
1560	t.Parallel()
1561
1562	s1, root, cleanupS1 := TestACLServer(t, nil)
1563	defer cleanupS1()
1564	codec := rpcClient(t, s1)
1565	testutil.WaitForLeader(t, s1.RPC)
1566	assert := assert.New(t)
1567
1568	// Create the node
1569	node := mock.Node()
1570	state := s1.fsm.State()
1571	assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
1572
1573	// Create the policy and tokens
1574	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyRead))
1575	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyDeny))
1576
1577	// Lookup the node without a token and expect failure
1578	req := &structs.NodeSpecificRequest{
1579		NodeID:       node.ID,
1580		QueryOptions: structs.QueryOptions{Region: "global"},
1581	}
1582	{
1583		var resp structs.SingleNodeResponse
1584		err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp)
1585		assert.NotNil(err, "RPC")
1586		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1587	}
1588
1589	// Try with a valid token
1590	req.AuthToken = validToken.SecretID
1591	{
1592		var resp structs.SingleNodeResponse
1593		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC")
1594		assert.Equal(node.ID, resp.Node.ID)
1595	}
1596
1597	// Try with a Node.SecretID
1598	req.AuthToken = node.SecretID
1599	{
1600		var resp structs.SingleNodeResponse
1601		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC")
1602		assert.Equal(node.ID, resp.Node.ID)
1603	}
1604
1605	// Try with a invalid token
1606	req.AuthToken = invalidToken.SecretID
1607	{
1608		var resp structs.SingleNodeResponse
1609		err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp)
1610		assert.NotNil(err, "RPC")
1611		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1612	}
1613
1614	// Try with a root token
1615	req.AuthToken = root.SecretID
1616	{
1617		var resp structs.SingleNodeResponse
1618		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC")
1619		assert.Equal(node.ID, resp.Node.ID)
1620	}
1621}
1622
1623func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
1624	t.Parallel()
1625
1626	s1, cleanupS1 := TestServer(t, nil)
1627	defer cleanupS1()
1628	state := s1.fsm.State()
1629	codec := rpcClient(t, s1)
1630	testutil.WaitForLeader(t, s1.RPC)
1631
1632	// Create the node
1633	node1 := mock.Node()
1634	node2 := mock.Node()
1635
1636	// First create an unrelated node.
1637	time.AfterFunc(100*time.Millisecond, func() {
1638		if err := state.UpsertNode(structs.MsgTypeTestSetup, 100, node1); err != nil {
1639			t.Fatalf("err: %v", err)
1640		}
1641	})
1642
1643	// Upsert the node we are watching later
1644	time.AfterFunc(200*time.Millisecond, func() {
1645		if err := state.UpsertNode(structs.MsgTypeTestSetup, 200, node2); err != nil {
1646			t.Fatalf("err: %v", err)
1647		}
1648	})
1649
1650	// Lookup the node
1651	req := &structs.NodeSpecificRequest{
1652		NodeID: node2.ID,
1653		QueryOptions: structs.QueryOptions{
1654			Region:        "global",
1655			MinQueryIndex: 150,
1656		},
1657	}
1658	var resp structs.SingleNodeResponse
1659	start := time.Now()
1660	if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp); err != nil {
1661		t.Fatalf("err: %v", err)
1662	}
1663
1664	if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
1665		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
1666	}
1667	if resp.Index != 200 {
1668		t.Fatalf("Bad index: %d %d", resp.Index, 200)
1669	}
1670	if resp.Node == nil || resp.Node.ID != node2.ID {
1671		t.Fatalf("bad: %#v", resp.Node)
1672	}
1673
1674	// Node update triggers watches
1675	time.AfterFunc(100*time.Millisecond, func() {
1676		nodeUpdate := mock.Node()
1677		nodeUpdate.ID = node2.ID
1678		nodeUpdate.Status = structs.NodeStatusDown
1679		if err := state.UpsertNode(structs.MsgTypeTestSetup, 300, nodeUpdate); err != nil {
1680			t.Fatalf("err: %v", err)
1681		}
1682	})
1683
1684	req.QueryOptions.MinQueryIndex = 250
1685	var resp2 structs.SingleNodeResponse
1686	start = time.Now()
1687	if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp2); err != nil {
1688		t.Fatalf("err: %v", err)
1689	}
1690
1691	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
1692		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
1693	}
1694	if resp2.Index != 300 {
1695		t.Fatalf("Bad index: %d %d", resp2.Index, 300)
1696	}
1697	if resp2.Node == nil || resp2.Node.Status != structs.NodeStatusDown {
1698		t.Fatalf("bad: %#v", resp2.Node)
1699	}
1700
1701	// Node delete triggers watches
1702	time.AfterFunc(100*time.Millisecond, func() {
1703		if err := state.DeleteNode(structs.MsgTypeTestSetup, 400, []string{node2.ID}); err != nil {
1704			t.Fatalf("err: %v", err)
1705		}
1706	})
1707
1708	req.QueryOptions.MinQueryIndex = 350
1709	var resp3 structs.SingleNodeResponse
1710	start = time.Now()
1711	if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp3); err != nil {
1712		t.Fatalf("err: %v", err)
1713	}
1714
1715	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
1716		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
1717	}
1718	if resp3.Index != 400 {
1719		t.Fatalf("Bad index: %d %d", resp2.Index, 400)
1720	}
1721	if resp3.Node != nil {
1722		t.Fatalf("bad: %#v", resp3.Node)
1723	}
1724}
1725
1726func TestClientEndpoint_GetAllocs(t *testing.T) {
1727	t.Parallel()
1728
1729	s1, cleanupS1 := TestServer(t, nil)
1730	defer cleanupS1()
1731	codec := rpcClient(t, s1)
1732	testutil.WaitForLeader(t, s1.RPC)
1733
1734	// Create the register request
1735	node := mock.Node()
1736	reg := &structs.NodeRegisterRequest{
1737		Node:         node,
1738		WriteRequest: structs.WriteRequest{Region: "global"},
1739	}
1740
1741	// Fetch the response
1742	var resp structs.GenericResponse
1743	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
1744		t.Fatalf("err: %v", err)
1745	}
1746	node.CreateIndex = resp.Index
1747	node.ModifyIndex = resp.Index
1748
1749	// Inject fake evaluations
1750	alloc := mock.Alloc()
1751	alloc.NodeID = node.ID
1752	state := s1.fsm.State()
1753	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
1754	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
1755	if err != nil {
1756		t.Fatalf("err: %v", err)
1757	}
1758
1759	// Lookup the allocs
1760	get := &structs.NodeSpecificRequest{
1761		NodeID:       node.ID,
1762		QueryOptions: structs.QueryOptions{Region: "global"},
1763	}
1764	var resp2 structs.NodeAllocsResponse
1765	if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", get, &resp2); err != nil {
1766		t.Fatalf("err: %v", err)
1767	}
1768	if resp2.Index != 100 {
1769		t.Fatalf("Bad index: %d %d", resp2.Index, 100)
1770	}
1771
1772	if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID {
1773		t.Fatalf("bad: %#v", resp2.Allocs)
1774	}
1775
1776	// Lookup non-existing node
1777	get.NodeID = "foobarbaz"
1778	if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", get, &resp2); err != nil {
1779		t.Fatalf("err: %v", err)
1780	}
1781	if resp2.Index != 100 {
1782		t.Fatalf("Bad index: %d %d", resp2.Index, 100)
1783	}
1784	if len(resp2.Allocs) != 0 {
1785		t.Fatalf("unexpected node")
1786	}
1787}
1788
1789func TestClientEndpoint_GetAllocs_ACL_Basic(t *testing.T) {
1790	t.Parallel()
1791
1792	s1, root, cleanupS1 := TestACLServer(t, nil)
1793	defer cleanupS1()
1794	codec := rpcClient(t, s1)
1795	testutil.WaitForLeader(t, s1.RPC)
1796	assert := assert.New(t)
1797
1798	// Create the node
1799	allocDefaultNS := mock.Alloc()
1800	node := mock.Node()
1801	allocDefaultNS.NodeID = node.ID
1802	state := s1.fsm.State()
1803	assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
1804	assert.Nil(state.UpsertJobSummary(2, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary")
1805	allocs := []*structs.Allocation{allocDefaultNS}
1806	assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 5, allocs), "UpsertAllocs")
1807
1808	// Create the namespace policy and tokens
1809	validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+
1810		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
1811	invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid",
1812		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
1813
1814	req := &structs.NodeSpecificRequest{
1815		NodeID: node.ID,
1816		QueryOptions: structs.QueryOptions{
1817			Region: "global",
1818		},
1819	}
1820
1821	// Lookup the node without a token and expect failure
1822	{
1823		var resp structs.NodeAllocsResponse
1824		err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp)
1825		assert.NotNil(err, "RPC")
1826		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1827	}
1828
1829	// Try with a valid token for the default namespace
1830	req.AuthToken = validDefaultToken.SecretID
1831	{
1832		var resp structs.NodeAllocsResponse
1833		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC")
1834		assert.Len(resp.Allocs, 1)
1835		assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID)
1836	}
1837
1838	// Try with a invalid token
1839	req.AuthToken = invalidToken.SecretID
1840	{
1841		var resp structs.NodeAllocsResponse
1842		err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp)
1843		assert.NotNil(err, "RPC")
1844		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1845	}
1846
1847	// Try with a root token
1848	req.AuthToken = root.SecretID
1849	{
1850		var resp structs.NodeAllocsResponse
1851		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC")
1852		assert.Len(resp.Allocs, 1)
1853		for _, alloc := range resp.Allocs {
1854			switch alloc.ID {
1855			case allocDefaultNS.ID:
1856				// expected
1857			default:
1858				t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace)
1859			}
1860		}
1861	}
1862}
1863
1864func TestClientEndpoint_GetAllocs_ACL_Namespaces(t *testing.T) {
1865	t.Parallel()
1866	s1, root, cleanupS1 := TestACLServer(t, nil)
1867	defer cleanupS1()
1868	codec := rpcClient(t, s1)
1869	testutil.WaitForLeader(t, s1.RPC)
1870	assert := assert.New(t)
1871
1872	// Create the namespaces
1873	ns1 := mock.Namespace()
1874	ns2 := mock.Namespace()
1875	ns1.Name = "altnamespace"
1876	ns2.Name = "should-only-be-displayed-for-root-ns"
1877
1878	// Create the allocs
1879	allocDefaultNS := mock.Alloc()
1880	allocAltNS := mock.Alloc()
1881	allocAltNS.Namespace = ns1.Name
1882	allocOtherNS := mock.Alloc()
1883	allocOtherNS.Namespace = ns2.Name
1884
1885	node := mock.Node()
1886	allocDefaultNS.NodeID = node.ID
1887	allocAltNS.NodeID = node.ID
1888	allocOtherNS.NodeID = node.ID
1889	state := s1.fsm.State()
1890	assert.Nil(state.UpsertNamespaces(1, []*structs.Namespace{ns1, ns2}), "UpsertNamespaces")
1891	assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 2, node), "UpsertNode")
1892	assert.Nil(state.UpsertJobSummary(3, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary")
1893	assert.Nil(state.UpsertJobSummary(4, mock.JobSummary(allocAltNS.JobID)), "UpsertJobSummary")
1894	assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(allocOtherNS.JobID)), "UpsertJobSummary")
1895	allocs := []*structs.Allocation{allocDefaultNS, allocAltNS, allocOtherNS}
1896	assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 6, allocs), "UpsertAllocs")
1897
1898	// Create the namespace policy and tokens
1899	validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+
1900		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
1901	validNoNSToken := mock.CreatePolicyAndToken(t, state, 1003, "test-alt-valid", mock.NodePolicy(acl.PolicyRead))
1902	invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid",
1903		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
1904
1905	// Lookup the node without a token and expect failure
1906	req := &structs.NodeSpecificRequest{
1907		NodeID:       node.ID,
1908		QueryOptions: structs.QueryOptions{Region: "global"},
1909	}
1910	{
1911		var resp structs.NodeAllocsResponse
1912		err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp)
1913		assert.NotNil(err, "RPC")
1914		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1915	}
1916
1917	// Try with a valid token for the default namespace
1918	req.AuthToken = validDefaultToken.SecretID
1919	{
1920		var resp structs.NodeAllocsResponse
1921		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC")
1922		assert.Len(resp.Allocs, 1)
1923		assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID)
1924	}
1925
1926	// Try with a valid token for a namespace with no allocs on this node
1927	req.AuthToken = validNoNSToken.SecretID
1928	{
1929		var resp structs.NodeAllocsResponse
1930		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC")
1931		assert.Len(resp.Allocs, 0)
1932	}
1933
1934	// Try with a invalid token
1935	req.AuthToken = invalidToken.SecretID
1936	{
1937		var resp structs.NodeAllocsResponse
1938		err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp)
1939		assert.NotNil(err, "RPC")
1940		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
1941	}
1942
1943	// Try with a root token
1944	req.AuthToken = root.SecretID
1945	{
1946		var resp structs.NodeAllocsResponse
1947		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC")
1948		assert.Len(resp.Allocs, 3)
1949		for _, alloc := range resp.Allocs {
1950			switch alloc.ID {
1951			case allocDefaultNS.ID, allocAltNS.ID, allocOtherNS.ID:
1952				// expected
1953			default:
1954				t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace)
1955			}
1956		}
1957	}
1958}
1959
1960func TestClientEndpoint_GetClientAllocs(t *testing.T) {
1961	t.Parallel()
1962	require := require.New(t)
1963
1964	s1, cleanupS1 := TestServer(t, nil)
1965	defer cleanupS1()
1966	codec := rpcClient(t, s1)
1967	testutil.WaitForLeader(t, s1.RPC)
1968
1969	// Check that we have no client connections
1970	require.Empty(s1.connectedNodes())
1971
1972	// Create the register request
1973	node := mock.Node()
1974	state := s1.fsm.State()
1975	require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 98, node))
1976
1977	// Inject fake evaluations
1978	alloc := mock.Alloc()
1979	alloc.NodeID = node.ID
1980	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
1981	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
1982	if err != nil {
1983		t.Fatalf("err: %v", err)
1984	}
1985
1986	// Lookup the allocs
1987	get := &structs.NodeSpecificRequest{
1988		NodeID:       node.ID,
1989		SecretID:     node.SecretID,
1990		QueryOptions: structs.QueryOptions{Region: "global"},
1991	}
1992	var resp2 structs.NodeClientAllocsResponse
1993	if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2); err != nil {
1994		t.Fatalf("err: %v", err)
1995	}
1996	if resp2.Index != 100 {
1997		t.Fatalf("Bad index: %d %d", resp2.Index, 100)
1998	}
1999
2000	if len(resp2.Allocs) != 1 || resp2.Allocs[alloc.ID] != 100 {
2001		t.Fatalf("bad: %#v", resp2.Allocs)
2002	}
2003
2004	// Check that we have the client connections
2005	nodes := s1.connectedNodes()
2006	require.Len(nodes, 1)
2007	require.Contains(nodes, node.ID)
2008
2009	// Lookup node with bad SecretID
2010	get.SecretID = "foobarbaz"
2011	var resp3 structs.NodeClientAllocsResponse
2012	err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp3)
2013	if err == nil || !strings.Contains(err.Error(), "does not match") {
2014		t.Fatalf("err: %v", err)
2015	}
2016
2017	// Lookup non-existing node
2018	get.NodeID = uuid.Generate()
2019	var resp4 structs.NodeClientAllocsResponse
2020	if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp4); err != nil {
2021		t.Fatalf("err: %v", err)
2022	}
2023	if resp4.Index != 100 {
2024		t.Fatalf("Bad index: %d %d", resp3.Index, 100)
2025	}
2026	if len(resp4.Allocs) != 0 {
2027		t.Fatalf("unexpected node %#v", resp3.Allocs)
2028	}
2029
2030	// Close the connection and check that we remove the client connections
2031	require.Nil(codec.Close())
2032	testutil.WaitForResult(func() (bool, error) {
2033		nodes := s1.connectedNodes()
2034		return len(nodes) == 0, nil
2035	}, func(err error) {
2036		t.Fatalf("should have no clients")
2037	})
2038}
2039
2040func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
2041	t.Parallel()
2042
2043	s1, cleanupS1 := TestServer(t, nil)
2044	defer cleanupS1()
2045	codec := rpcClient(t, s1)
2046	testutil.WaitForLeader(t, s1.RPC)
2047
2048	// Create the register request
2049	node := mock.Node()
2050	reg := &structs.NodeRegisterRequest{
2051		Node:         node,
2052		WriteRequest: structs.WriteRequest{Region: "global"},
2053	}
2054
2055	// Fetch the response
2056	var resp structs.GenericResponse
2057	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2058		t.Fatalf("err: %v", err)
2059	}
2060	node.CreateIndex = resp.Index
2061	node.ModifyIndex = resp.Index
2062
2063	// Inject fake evaluations async
2064	now := time.Now().UTC().UnixNano()
2065	alloc := mock.Alloc()
2066	alloc.NodeID = node.ID
2067	alloc.ModifyTime = now
2068	state := s1.fsm.State()
2069	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2070	start := time.Now()
2071	time.AfterFunc(100*time.Millisecond, func() {
2072		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
2073		if err != nil {
2074			t.Fatalf("err: %v", err)
2075		}
2076	})
2077
2078	// Lookup the allocs in a blocking query
2079	req := &structs.NodeSpecificRequest{
2080		NodeID:   node.ID,
2081		SecretID: node.SecretID,
2082		QueryOptions: structs.QueryOptions{
2083			Region:        "global",
2084			MinQueryIndex: 50,
2085			MaxQueryTime:  time.Second,
2086		},
2087	}
2088	var resp2 structs.NodeClientAllocsResponse
2089	if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil {
2090		t.Fatalf("err: %v", err)
2091	}
2092
2093	// Should block at least 100ms
2094	if time.Since(start) < 100*time.Millisecond {
2095		t.Fatalf("too fast")
2096	}
2097
2098	if resp2.Index != 100 {
2099		t.Fatalf("Bad index: %d %d", resp2.Index, 100)
2100	}
2101
2102	if len(resp2.Allocs) != 1 || resp2.Allocs[alloc.ID] != 100 {
2103		t.Fatalf("bad: %#v", resp2.Allocs)
2104	}
2105
2106	iter, err := state.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID)
2107	if err != nil {
2108		t.Fatalf("err: %v", err)
2109	}
2110
2111	getAllocs := func(iter memdb.ResultIterator) []*structs.Allocation {
2112		var allocs []*structs.Allocation
2113		for {
2114			raw := iter.Next()
2115			if raw == nil {
2116				break
2117			}
2118			allocs = append(allocs, raw.(*structs.Allocation))
2119		}
2120		return allocs
2121	}
2122	out := getAllocs(iter)
2123
2124	if len(out) != 1 {
2125		t.Fatalf("Expected to get one allocation but got:%v", out)
2126	}
2127
2128	if out[0].ModifyTime != now {
2129		t.Fatalf("Invalid modify time %v", out[0].ModifyTime)
2130	}
2131
2132	// Alloc updates fire watches
2133	time.AfterFunc(100*time.Millisecond, func() {
2134		allocUpdate := mock.Alloc()
2135		allocUpdate.NodeID = alloc.NodeID
2136		allocUpdate.ID = alloc.ID
2137		allocUpdate.ClientStatus = structs.AllocClientStatusRunning
2138		state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
2139		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate})
2140		if err != nil {
2141			t.Fatalf("err: %v", err)
2142		}
2143	})
2144
2145	req.QueryOptions.MinQueryIndex = 150
2146	var resp3 structs.NodeClientAllocsResponse
2147	if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil {
2148		t.Fatalf("err: %v", err)
2149	}
2150
2151	if time.Since(start) < 100*time.Millisecond {
2152		t.Fatalf("too fast")
2153	}
2154	if resp3.Index != 200 {
2155		t.Fatalf("Bad index: %d %d", resp3.Index, 200)
2156	}
2157	if len(resp3.Allocs) != 1 || resp3.Allocs[alloc.ID] != 200 {
2158		t.Fatalf("bad: %#v", resp3.Allocs)
2159	}
2160}
2161
2162func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {
2163	t.Parallel()
2164	assert := assert.New(t)
2165
2166	s1, cleanupS1 := TestServer(t, nil)
2167	defer cleanupS1()
2168	codec := rpcClient(t, s1)
2169	testutil.WaitForLeader(t, s1.RPC)
2170
2171	// Create the register request
2172	node := mock.Node()
2173	reg := &structs.NodeRegisterRequest{
2174		Node:         node,
2175		WriteRequest: structs.WriteRequest{Region: "global"},
2176	}
2177
2178	// Fetch the response
2179	var resp structs.GenericResponse
2180	assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
2181	node.CreateIndex = resp.Index
2182	node.ModifyIndex = resp.Index
2183
2184	// Inject fake allocations async
2185	alloc1 := mock.Alloc()
2186	alloc1.NodeID = node.ID
2187	alloc2 := mock.Alloc()
2188	alloc2.NodeID = node.ID
2189	state := s1.fsm.State()
2190	state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
2191	start := time.Now()
2192	time.AfterFunc(100*time.Millisecond, func() {
2193		assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1, alloc2}))
2194	})
2195
2196	// Lookup the allocs in a blocking query
2197	req := &structs.NodeSpecificRequest{
2198		NodeID:   node.ID,
2199		SecretID: node.SecretID,
2200		QueryOptions: structs.QueryOptions{
2201			Region:        "global",
2202			MinQueryIndex: 50,
2203			MaxQueryTime:  time.Second,
2204		},
2205	}
2206	var resp2 structs.NodeClientAllocsResponse
2207	assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2))
2208
2209	// Should block at least 100ms
2210	if time.Since(start) < 100*time.Millisecond {
2211		t.Fatalf("too fast")
2212	}
2213
2214	assert.EqualValues(100, resp2.Index)
2215	if assert.Len(resp2.Allocs, 2) {
2216		assert.EqualValues(100, resp2.Allocs[alloc1.ID])
2217	}
2218
2219	// Delete an allocation
2220	time.AfterFunc(100*time.Millisecond, func() {
2221		assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}))
2222	})
2223
2224	req.QueryOptions.MinQueryIndex = 150
2225	var resp3 structs.NodeClientAllocsResponse
2226	assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3))
2227
2228	if time.Since(start) < 100*time.Millisecond {
2229		t.Fatalf("too fast")
2230	}
2231	assert.EqualValues(200, resp3.Index)
2232	if assert.Len(resp3.Allocs, 1) {
2233		assert.EqualValues(100, resp3.Allocs[alloc1.ID])
2234	}
2235}
2236
2237// A MigrateToken should not be created if an allocation shares the same node
2238// with its previous allocation
2239func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) {
2240	t.Parallel()
2241	assert := assert.New(t)
2242
2243	s1, cleanupS1 := TestServer(t, nil)
2244	defer cleanupS1()
2245	codec := rpcClient(t, s1)
2246	testutil.WaitForLeader(t, s1.RPC)
2247
2248	// Create the register request
2249	node := mock.Node()
2250	reg := &structs.NodeRegisterRequest{
2251		Node:         node,
2252		WriteRequest: structs.WriteRequest{Region: "global"},
2253	}
2254
2255	// Fetch the response
2256	var resp structs.GenericResponse
2257	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2258		t.Fatalf("err: %v", err)
2259	}
2260	node.CreateIndex = resp.Index
2261	node.ModifyIndex = resp.Index
2262
2263	// Inject fake evaluations
2264	prevAlloc := mock.Alloc()
2265	prevAlloc.NodeID = node.ID
2266	alloc := mock.Alloc()
2267	alloc.NodeID = node.ID
2268	alloc.PreviousAllocation = prevAlloc.ID
2269	alloc.DesiredStatus = structs.AllocClientStatusComplete
2270	state := s1.fsm.State()
2271	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2272	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{prevAlloc, alloc})
2273	assert.Nil(err)
2274
2275	// Lookup the allocs
2276	get := &structs.NodeSpecificRequest{
2277		NodeID:       node.ID,
2278		SecretID:     node.SecretID,
2279		QueryOptions: structs.QueryOptions{Region: "global"},
2280	}
2281	var resp2 structs.NodeClientAllocsResponse
2282
2283	err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2)
2284	assert.Nil(err)
2285
2286	assert.Equal(uint64(100), resp2.Index)
2287	assert.Equal(2, len(resp2.Allocs))
2288	assert.Equal(uint64(100), resp2.Allocs[alloc.ID])
2289	assert.Equal(0, len(resp2.MigrateTokens))
2290}
2291
2292func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
2293	t.Parallel()
2294
2295	s1, cleanupS1 := TestServer(t, nil)
2296	defer cleanupS1()
2297	codec := rpcClient(t, s1)
2298	testutil.WaitForLeader(t, s1.RPC)
2299
2300	// Create the register request
2301	node := mock.Node()
2302	reg := &structs.NodeRegisterRequest{
2303		Node:         node,
2304		WriteRequest: structs.WriteRequest{Region: "global"},
2305	}
2306
2307	// Fetch the response
2308	var resp structs.GenericResponse
2309	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2310		t.Fatalf("err: %v", err)
2311	}
2312	node.CreateIndex = resp.Index
2313	node.ModifyIndex = resp.Index
2314
2315	// Inject fake evaluations async
2316	alloc := mock.Alloc()
2317	alloc.NodeID = node.ID
2318	state := s1.fsm.State()
2319	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2320	start := time.Now()
2321	time.AfterFunc(100*time.Millisecond, func() {
2322		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
2323		if err != nil {
2324			t.Fatalf("err: %v", err)
2325		}
2326	})
2327
2328	// Lookup the allocs in a blocking query
2329	req := &structs.NodeSpecificRequest{
2330		NodeID: node.ID,
2331		QueryOptions: structs.QueryOptions{
2332			Region:        "global",
2333			MinQueryIndex: 50,
2334			MaxQueryTime:  time.Second,
2335		},
2336	}
2337	var resp2 structs.NodeAllocsResponse
2338	if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp2); err != nil {
2339		t.Fatalf("err: %v", err)
2340	}
2341
2342	// Should block at least 100ms
2343	if time.Since(start) < 100*time.Millisecond {
2344		t.Fatalf("too fast")
2345	}
2346
2347	if resp2.Index != 100 {
2348		t.Fatalf("Bad index: %d %d", resp2.Index, 100)
2349	}
2350
2351	if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID {
2352		t.Fatalf("bad: %#v", resp2.Allocs)
2353	}
2354
2355	// Alloc updates fire watches
2356	time.AfterFunc(100*time.Millisecond, func() {
2357		allocUpdate := mock.Alloc()
2358		allocUpdate.NodeID = alloc.NodeID
2359		allocUpdate.ID = alloc.ID
2360		allocUpdate.ClientStatus = structs.AllocClientStatusRunning
2361		state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
2362		err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate})
2363		if err != nil {
2364			t.Fatalf("err: %v", err)
2365		}
2366	})
2367
2368	req.QueryOptions.MinQueryIndex = 150
2369	var resp3 structs.NodeAllocsResponse
2370	if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp3); err != nil {
2371		t.Fatalf("err: %v", err)
2372	}
2373
2374	if time.Since(start) < 100*time.Millisecond {
2375		t.Fatalf("too fast")
2376	}
2377	if resp3.Index != 200 {
2378		t.Fatalf("Bad index: %d %d", resp3.Index, 200)
2379	}
2380	if len(resp3.Allocs) != 1 || resp3.Allocs[0].ClientStatus != structs.AllocClientStatusRunning {
2381		t.Fatalf("bad: %#v", resp3.Allocs[0])
2382	}
2383}
2384
2385func TestClientEndpoint_UpdateAlloc(t *testing.T) {
2386	t.Parallel()
2387
2388	s1, cleanupS1 := TestServer(t, func(c *Config) {
2389		// Disabling scheduling in this test so that we can
2390		// ensure that the state store doesn't accumulate more evals
2391		// than what we expect the unit test to add
2392		c.NumSchedulers = 0
2393	})
2394
2395	defer cleanupS1()
2396	codec := rpcClient(t, s1)
2397	testutil.WaitForLeader(t, s1.RPC)
2398	require := require.New(t)
2399
2400	// Create the register request
2401	node := mock.Node()
2402	reg := &structs.NodeRegisterRequest{
2403		Node:         node,
2404		WriteRequest: structs.WriteRequest{Region: "global"},
2405	}
2406
2407	// Fetch the response
2408	var resp structs.GenericResponse
2409	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2410		t.Fatalf("err: %v", err)
2411	}
2412
2413	state := s1.fsm.State()
2414	// Inject mock job
2415	job := mock.Job()
2416	job.ID = "mytestjob"
2417	err := state.UpsertJob(structs.MsgTypeTestSetup, 101, job)
2418	require.Nil(err)
2419
2420	// Inject fake allocations
2421	alloc := mock.Alloc()
2422	alloc.JobID = job.ID
2423	alloc.NodeID = node.ID
2424	err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2425	require.Nil(err)
2426	alloc.TaskGroup = job.TaskGroups[0].Name
2427
2428	alloc2 := mock.Alloc()
2429	alloc2.JobID = job.ID
2430	alloc2.NodeID = node.ID
2431	err = state.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID))
2432	require.Nil(err)
2433	alloc2.TaskGroup = job.TaskGroups[0].Name
2434
2435	err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2})
2436	require.Nil(err)
2437
2438	// Attempt updates of more than one alloc for the same job
2439	clientAlloc1 := new(structs.Allocation)
2440	*clientAlloc1 = *alloc
2441	clientAlloc1.ClientStatus = structs.AllocClientStatusFailed
2442
2443	clientAlloc2 := new(structs.Allocation)
2444	*clientAlloc2 = *alloc2
2445	clientAlloc2.ClientStatus = structs.AllocClientStatusFailed
2446
2447	// Update the alloc
2448	update := &structs.AllocUpdateRequest{
2449		Alloc:        []*structs.Allocation{clientAlloc1, clientAlloc2},
2450		WriteRequest: structs.WriteRequest{Region: "global"},
2451	}
2452	var resp2 structs.NodeAllocsResponse
2453	start := time.Now()
2454	err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)
2455	require.Nil(err)
2456	require.NotEqual(uint64(0), resp2.Index)
2457
2458	if diff := time.Since(start); diff < batchUpdateInterval {
2459		t.Fatalf("too fast: %v", diff)
2460	}
2461
2462	// Lookup the alloc
2463	ws := memdb.NewWatchSet()
2464	out, err := state.AllocByID(ws, alloc.ID)
2465	require.Nil(err)
2466	require.Equal(structs.AllocClientStatusFailed, out.ClientStatus)
2467	require.True(out.ModifyTime > 0)
2468
2469	// Assert that exactly one eval with TriggeredBy EvalTriggerRetryFailedAlloc exists
2470	evaluations, err := state.EvalsByJob(ws, job.Namespace, job.ID)
2471	require.Nil(err)
2472	require.True(len(evaluations) != 0)
2473	foundCount := 0
2474	for _, resultEval := range evaluations {
2475		if resultEval.TriggeredBy == structs.EvalTriggerRetryFailedAlloc && resultEval.WaitUntil.IsZero() {
2476			foundCount++
2477		}
2478	}
2479	require.Equal(1, foundCount, "Should create exactly one eval for failed allocs")
2480
2481}
2482
2483func TestClientEndpoint_BatchUpdate(t *testing.T) {
2484	t.Parallel()
2485
2486	s1, cleanupS1 := TestServer(t, nil)
2487	defer cleanupS1()
2488	codec := rpcClient(t, s1)
2489	testutil.WaitForLeader(t, s1.RPC)
2490
2491	// Create the register request
2492	node := mock.Node()
2493	reg := &structs.NodeRegisterRequest{
2494		Node:         node,
2495		WriteRequest: structs.WriteRequest{Region: "global"},
2496	}
2497
2498	// Fetch the response
2499	var resp structs.GenericResponse
2500	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2501		t.Fatalf("err: %v", err)
2502	}
2503
2504	// Inject fake evaluations
2505	alloc := mock.Alloc()
2506	alloc.NodeID = node.ID
2507	state := s1.fsm.State()
2508	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2509	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
2510	if err != nil {
2511		t.Fatalf("err: %v", err)
2512	}
2513
2514	// Attempt update
2515	clientAlloc := new(structs.Allocation)
2516	*clientAlloc = *alloc
2517	clientAlloc.ClientStatus = structs.AllocClientStatusFailed
2518
2519	// Call to do the batch update
2520	bf := structs.NewBatchFuture()
2521	endpoint := s1.staticEndpoints.Node
2522	endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}, nil)
2523	if err := bf.Wait(); err != nil {
2524		t.Fatalf("err: %v", err)
2525	}
2526	if bf.Index() == 0 {
2527		t.Fatalf("Bad index: %d", bf.Index())
2528	}
2529
2530	// Lookup the alloc
2531	ws := memdb.NewWatchSet()
2532	out, err := state.AllocByID(ws, alloc.ID)
2533	if err != nil {
2534		t.Fatalf("err: %v", err)
2535	}
2536	if out.ClientStatus != structs.AllocClientStatusFailed {
2537		t.Fatalf("Bad: %#v", out)
2538	}
2539}
2540
2541func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
2542	t.Parallel()
2543
2544	s1, cleanupS1 := TestServer(t, nil)
2545	defer cleanupS1()
2546	codec := rpcClient(t, s1)
2547	testutil.WaitForLeader(t, s1.RPC)
2548
2549	// Create the register request
2550	node := mock.Node()
2551	reg := &structs.NodeRegisterRequest{
2552		Node:         node,
2553		WriteRequest: structs.WriteRequest{Region: "global"},
2554	}
2555
2556	// Fetch the response
2557	var resp structs.GenericResponse
2558	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2559		t.Fatalf("err: %v", err)
2560	}
2561
2562	// Swap the servers Vault Client
2563	tvc := &TestVaultClient{}
2564	s1.vault = tvc
2565
2566	// Inject fake allocation and vault accessor
2567	alloc := mock.Alloc()
2568	alloc.NodeID = node.ID
2569	state := s1.fsm.State()
2570	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
2571	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}); err != nil {
2572		t.Fatalf("err: %v", err)
2573	}
2574
2575	va := mock.VaultAccessor()
2576	va.NodeID = node.ID
2577	va.AllocID = alloc.ID
2578	if err := state.UpsertVaultAccessor(101, []*structs.VaultAccessor{va}); err != nil {
2579		t.Fatalf("err: %v", err)
2580	}
2581
2582	// Inject mock job
2583	job := mock.Job()
2584	job.ID = alloc.JobID
2585	err := state.UpsertJob(structs.MsgTypeTestSetup, 101, job)
2586	if err != nil {
2587		t.Fatalf("err: %v", err)
2588	}
2589
2590	// Attempt update
2591	clientAlloc := new(structs.Allocation)
2592	*clientAlloc = *alloc
2593	clientAlloc.ClientStatus = structs.AllocClientStatusFailed
2594
2595	// Update the alloc
2596	update := &structs.AllocUpdateRequest{
2597		Alloc:        []*structs.Allocation{clientAlloc},
2598		WriteRequest: structs.WriteRequest{Region: "global"},
2599	}
2600	var resp2 structs.NodeAllocsResponse
2601	start := time.Now()
2602	if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2); err != nil {
2603		t.Fatalf("err: %v", err)
2604	}
2605	if resp2.Index == 0 {
2606		t.Fatalf("Bad index: %d", resp2.Index)
2607	}
2608	if diff := time.Since(start); diff < batchUpdateInterval {
2609		t.Fatalf("too fast: %v", diff)
2610	}
2611
2612	// Lookup the alloc
2613	ws := memdb.NewWatchSet()
2614	out, err := state.AllocByID(ws, alloc.ID)
2615	if err != nil {
2616		t.Fatalf("err: %v", err)
2617	}
2618	if out.ClientStatus != structs.AllocClientStatusFailed {
2619		t.Fatalf("Bad: %#v", out)
2620	}
2621
2622	if l := len(tvc.RevokedTokens); l != 1 {
2623		t.Fatalf("Deregister revoked %d tokens; want 1", l)
2624	}
2625}
2626
2627func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
2628	t.Parallel()
2629
2630	s1, cleanupS1 := TestServer(t, nil)
2631	defer cleanupS1()
2632	testutil.WaitForLeader(t, s1.RPC)
2633
2634	// Inject fake evaluations
2635	alloc := mock.Alloc()
2636	state := s1.fsm.State()
2637	state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
2638	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil {
2639		t.Fatalf("err: %v", err)
2640	}
2641
2642	// Inject a fake system job.
2643	job := mock.SystemJob()
2644	if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil {
2645		t.Fatalf("err: %v", err)
2646	}
2647
2648	// Create some evaluations
2649	ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
2650	if err != nil {
2651		t.Fatalf("err: %v", err)
2652	}
2653	if index == 0 {
2654		t.Fatalf("bad: %d", index)
2655	}
2656	if len(ids) != 2 {
2657		t.Fatalf("bad: %s", ids)
2658	}
2659
2660	// Lookup the evaluations
2661	ws := memdb.NewWatchSet()
2662	evalByType := make(map[string]*structs.Evaluation, 2)
2663	for _, id := range ids {
2664		eval, err := state.EvalByID(ws, id)
2665		if err != nil {
2666			t.Fatalf("err: %v", err)
2667		}
2668		if eval == nil {
2669			t.Fatalf("expected eval")
2670		}
2671
2672		if old, ok := evalByType[eval.Type]; ok {
2673			t.Fatalf("multiple evals of the same type: %v and %v", old, eval)
2674		}
2675
2676		evalByType[eval.Type] = eval
2677	}
2678
2679	if len(evalByType) != 2 {
2680		t.Fatalf("Expected a service and system job; got %#v", evalByType)
2681	}
2682
2683	// Ensure the evals are correct.
2684	for schedType, eval := range evalByType {
2685		expPriority := alloc.Job.Priority
2686		expJobID := alloc.JobID
2687		if schedType == "system" {
2688			expPriority = job.Priority
2689			expJobID = job.ID
2690		}
2691
2692		t.Logf("checking eval: %v", pretty.Sprint(eval))
2693		require.Equal(t, index, eval.CreateIndex)
2694		require.Equal(t, structs.EvalTriggerNodeUpdate, eval.TriggeredBy)
2695		require.Equal(t, alloc.NodeID, eval.NodeID)
2696		require.Equal(t, uint64(1), eval.NodeModifyIndex)
2697		switch eval.Status {
2698		case structs.EvalStatusPending, structs.EvalStatusComplete:
2699			// success
2700		default:
2701			t.Fatalf("expected pending or complete, found %v", eval.Status)
2702		}
2703		require.Equal(t, expPriority, eval.Priority)
2704		require.Equal(t, expJobID, eval.JobID)
2705		require.NotZero(t, eval.CreateTime)
2706		require.NotZero(t, eval.ModifyTime)
2707	}
2708}
2709
2710func TestClientEndpoint_Evaluate(t *testing.T) {
2711	t.Parallel()
2712
2713	s1, cleanupS1 := TestServer(t, func(c *Config) {
2714		c.NumSchedulers = 0 // Prevent automatic dequeue
2715	})
2716	defer cleanupS1()
2717	codec := rpcClient(t, s1)
2718	testutil.WaitForLeader(t, s1.RPC)
2719
2720	// Inject fake evaluations
2721	alloc := mock.Alloc()
2722	node := mock.Node()
2723	node.ID = alloc.NodeID
2724	state := s1.fsm.State()
2725	err := state.UpsertNode(structs.MsgTypeTestSetup, 1, node)
2726	if err != nil {
2727		t.Fatalf("err: %v", err)
2728	}
2729	state.UpsertJobSummary(2, mock.JobSummary(alloc.JobID))
2730	err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc})
2731	if err != nil {
2732		t.Fatalf("err: %v", err)
2733	}
2734
2735	// Re-evaluate
2736	req := &structs.NodeEvaluateRequest{
2737		NodeID:       alloc.NodeID,
2738		WriteRequest: structs.WriteRequest{Region: "global"},
2739	}
2740
2741	// Fetch the response
2742	var resp structs.NodeUpdateResponse
2743	if err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp); err != nil {
2744		t.Fatalf("err: %v", err)
2745	}
2746	if resp.Index == 0 {
2747		t.Fatalf("bad index: %d", resp.Index)
2748	}
2749
2750	// Create some evaluations
2751	ids := resp.EvalIDs
2752	if len(ids) != 1 {
2753		t.Fatalf("bad: %s", ids)
2754	}
2755
2756	// Lookup the evaluation
2757	ws := memdb.NewWatchSet()
2758	eval, err := state.EvalByID(ws, ids[0])
2759	if err != nil {
2760		t.Fatalf("err: %v", err)
2761	}
2762	if eval == nil {
2763		t.Fatalf("expected eval")
2764	}
2765	if eval.CreateIndex != resp.Index {
2766		t.Fatalf("index mis-match")
2767	}
2768
2769	if eval.Priority != alloc.Job.Priority {
2770		t.Fatalf("bad: %#v", eval)
2771	}
2772	if eval.Type != alloc.Job.Type {
2773		t.Fatalf("bad: %#v", eval)
2774	}
2775	if eval.TriggeredBy != structs.EvalTriggerNodeUpdate {
2776		t.Fatalf("bad: %#v", eval)
2777	}
2778	if eval.JobID != alloc.JobID {
2779		t.Fatalf("bad: %#v", eval)
2780	}
2781	if eval.NodeID != alloc.NodeID {
2782		t.Fatalf("bad: %#v", eval)
2783	}
2784	if eval.NodeModifyIndex != 1 {
2785		t.Fatalf("bad: %#v", eval)
2786	}
2787	if eval.Status != structs.EvalStatusPending {
2788		t.Fatalf("bad: %#v", eval)
2789	}
2790	if eval.CreateTime == 0 {
2791		t.Fatalf("CreateTime is unset: %#v", eval)
2792	}
2793	if eval.ModifyTime == 0 {
2794		t.Fatalf("ModifyTime is unset: %#v", eval)
2795	}
2796}
2797
2798func TestClientEndpoint_Evaluate_ACL(t *testing.T) {
2799	t.Parallel()
2800
2801	s1, root, cleanupS1 := TestACLServer(t, nil)
2802	defer cleanupS1()
2803	codec := rpcClient(t, s1)
2804	testutil.WaitForLeader(t, s1.RPC)
2805	assert := assert.New(t)
2806
2807	// Create the node with an alloc
2808	alloc := mock.Alloc()
2809	node := mock.Node()
2810	node.ID = alloc.NodeID
2811	state := s1.fsm.State()
2812
2813	assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
2814	assert.Nil(state.UpsertJobSummary(2, mock.JobSummary(alloc.JobID)), "UpsertJobSummary")
2815	assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}), "UpsertAllocs")
2816
2817	// Create the policy and tokens
2818	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
2819	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
2820
2821	// Re-evaluate without a token and expect failure
2822	req := &structs.NodeEvaluateRequest{
2823		NodeID:       alloc.NodeID,
2824		WriteRequest: structs.WriteRequest{Region: "global"},
2825	}
2826	{
2827		var resp structs.NodeUpdateResponse
2828		err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp)
2829		assert.NotNil(err, "RPC")
2830		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
2831	}
2832
2833	// Try with a valid token
2834	req.AuthToken = validToken.SecretID
2835	{
2836		var resp structs.NodeUpdateResponse
2837		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp), "RPC")
2838	}
2839
2840	// Try with a invalid token
2841	req.AuthToken = invalidToken.SecretID
2842	{
2843		var resp structs.NodeUpdateResponse
2844		err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp)
2845		assert.NotNil(err, "RPC")
2846		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
2847	}
2848
2849	// Try with a root token
2850	req.AuthToken = root.SecretID
2851	{
2852		var resp structs.NodeUpdateResponse
2853		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp), "RPC")
2854	}
2855}
2856
2857func TestClientEndpoint_ListNodes(t *testing.T) {
2858	t.Parallel()
2859
2860	s1, cleanupS1 := TestServer(t, nil)
2861	defer cleanupS1()
2862	codec := rpcClient(t, s1)
2863	testutil.WaitForLeader(t, s1.RPC)
2864
2865	// Create the register request
2866	node := mock.Node()
2867	node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{
2868		"foo": {
2869			Name:     "foo",
2870			Path:     "/",
2871			ReadOnly: true,
2872		},
2873	}
2874	reg := &structs.NodeRegisterRequest{
2875		Node:         node,
2876		WriteRequest: structs.WriteRequest{Region: "global"},
2877	}
2878
2879	// Fetch the response
2880	var resp structs.GenericResponse
2881	if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
2882		t.Fatalf("err: %v", err)
2883	}
2884	node.CreateIndex = resp.Index
2885	node.ModifyIndex = resp.Index
2886
2887	// Lookup the node
2888	get := &structs.NodeListRequest{
2889		QueryOptions: structs.QueryOptions{Region: "global"},
2890	}
2891	var resp2 structs.NodeListResponse
2892	if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp2); err != nil {
2893		t.Fatalf("err: %v", err)
2894	}
2895	if resp2.Index != resp.Index {
2896		t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index)
2897	}
2898
2899	require.Len(t, resp2.Nodes, 1)
2900	require.Equal(t, node.ID, resp2.Nodes[0].ID)
2901
2902	// #7344 - Assert HostVolumes are included in stub
2903	require.Equal(t, node.HostVolumes, resp2.Nodes[0].HostVolumes)
2904
2905	// #9055 - Assert Resources are *not* included by default
2906	require.Nil(t, resp2.Nodes[0].NodeResources)
2907	require.Nil(t, resp2.Nodes[0].ReservedResources)
2908
2909	// Lookup the node with prefix
2910	get = &structs.NodeListRequest{
2911		QueryOptions: structs.QueryOptions{Region: "global", Prefix: node.ID[:4]},
2912	}
2913	var resp3 structs.NodeListResponse
2914	if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp3); err != nil {
2915		t.Fatalf("err: %v", err)
2916	}
2917	if resp3.Index != resp.Index {
2918		t.Fatalf("Bad index: %d %d", resp3.Index, resp2.Index)
2919	}
2920
2921	if len(resp3.Nodes) != 1 {
2922		t.Fatalf("bad: %#v", resp3.Nodes)
2923	}
2924	if resp3.Nodes[0].ID != node.ID {
2925		t.Fatalf("bad: %#v", resp3.Nodes[0])
2926	}
2927}
2928
2929func TestClientEndpoint_ListNodes_Fields(t *testing.T) {
2930	t.Parallel()
2931
2932	s1, cleanupS1 := TestServer(t, nil)
2933	defer cleanupS1()
2934	codec := rpcClient(t, s1)
2935	testutil.WaitForLeader(t, s1.RPC)
2936
2937	// Create the register request
2938	node := mock.Node()
2939	reg := &structs.NodeRegisterRequest{
2940		Node:         node,
2941		WriteRequest: structs.WriteRequest{Region: "global"},
2942	}
2943
2944	// Fetch the response
2945	var resp structs.GenericResponse
2946	require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
2947	node.CreateIndex = resp.Index
2948	node.ModifyIndex = resp.Index
2949
2950	// Lookup the node with fields
2951	get := &structs.NodeListRequest{
2952		QueryOptions: structs.QueryOptions{Region: "global"},
2953		Fields: &structs.NodeStubFields{
2954			Resources: true,
2955		},
2956	}
2957	var resp2 structs.NodeListResponse
2958	require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp2))
2959	require.Equal(t, resp.Index, resp2.Index)
2960	require.Len(t, resp2.Nodes, 1)
2961	require.Equal(t, node.ID, resp2.Nodes[0].ID)
2962	require.NotNil(t, resp2.Nodes[0].NodeResources)
2963	require.NotNil(t, resp2.Nodes[0].ReservedResources)
2964}
2965
2966func TestClientEndpoint_ListNodes_ACL(t *testing.T) {
2967	t.Parallel()
2968
2969	s1, root, cleanupS1 := TestACLServer(t, nil)
2970	defer cleanupS1()
2971	codec := rpcClient(t, s1)
2972	testutil.WaitForLeader(t, s1.RPC)
2973	assert := assert.New(t)
2974
2975	// Create the node
2976	node := mock.Node()
2977	state := s1.fsm.State()
2978	assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode")
2979
2980	// Create the namespace policy and tokens
2981	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyRead))
2982	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyDeny))
2983
2984	// Lookup the node without a token and expect failure
2985	req := &structs.NodeListRequest{
2986		QueryOptions: structs.QueryOptions{Region: "global"},
2987	}
2988	{
2989		var resp structs.NodeListResponse
2990		err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp)
2991		assert.NotNil(err, "RPC")
2992		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
2993	}
2994
2995	// Try with a valid token
2996	req.AuthToken = validToken.SecretID
2997	{
2998		var resp structs.NodeListResponse
2999		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp), "RPC")
3000		assert.Equal(node.ID, resp.Nodes[0].ID)
3001	}
3002
3003	// Try with a invalid token
3004	req.AuthToken = invalidToken.SecretID
3005	{
3006		var resp structs.NodeListResponse
3007		err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp)
3008		assert.NotNil(err, "RPC")
3009		assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
3010	}
3011
3012	// Try with a root token
3013	req.AuthToken = root.SecretID
3014	{
3015		var resp structs.NodeListResponse
3016		assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp), "RPC")
3017		assert.Equal(node.ID, resp.Nodes[0].ID)
3018	}
3019}
3020
3021func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
3022	t.Parallel()
3023
3024	s1, cleanupS1 := TestServer(t, nil)
3025	defer cleanupS1()
3026	state := s1.fsm.State()
3027	codec := rpcClient(t, s1)
3028	testutil.WaitForLeader(t, s1.RPC)
3029
3030	// Disable drainer to prevent drain from completing during test
3031	s1.nodeDrainer.SetEnabled(false, nil)
3032
3033	// Create the node
3034	node := mock.Node()
3035
3036	// Node upsert triggers watches
3037	errCh := make(chan error, 1)
3038	timer := time.AfterFunc(100*time.Millisecond, func() {
3039		errCh <- state.UpsertNode(structs.MsgTypeTestSetup, 2, node)
3040	})
3041	defer timer.Stop()
3042
3043	req := &structs.NodeListRequest{
3044		QueryOptions: structs.QueryOptions{
3045			Region:        "global",
3046			MinQueryIndex: 1,
3047		},
3048	}
3049	start := time.Now()
3050	var resp structs.NodeListResponse
3051	if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp); err != nil {
3052		t.Fatalf("err: %v", err)
3053	}
3054
3055	if err := <-errCh; err != nil {
3056		t.Fatalf("error from timer: %v", err)
3057	}
3058
3059	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
3060		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
3061	}
3062	if resp.Index != 2 {
3063		t.Fatalf("Bad index: %d %d", resp.Index, 2)
3064	}
3065	if len(resp.Nodes) != 1 || resp.Nodes[0].ID != node.ID {
3066		t.Fatalf("bad: %#v", resp.Nodes)
3067	}
3068
3069	// Node drain updates trigger watches.
3070	time.AfterFunc(100*time.Millisecond, func() {
3071		s := &structs.DrainStrategy{
3072			DrainSpec: structs.DrainSpec{
3073				Deadline: 10 * time.Second,
3074			},
3075		}
3076		errCh <- state.UpdateNodeDrain(structs.MsgTypeTestSetup, 3, node.ID, s, false, 0, nil, nil, "")
3077	})
3078
3079	req.MinQueryIndex = 2
3080	var resp2 structs.NodeListResponse
3081	start = time.Now()
3082	if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp2); err != nil {
3083		t.Fatalf("err: %v", err)
3084	}
3085
3086	if err := <-errCh; err != nil {
3087		t.Fatalf("error from timer: %v", err)
3088	}
3089
3090	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
3091		t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
3092	}
3093	if resp2.Index != 3 {
3094		t.Fatalf("Bad index: %d %d", resp2.Index, 3)
3095	}
3096	if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain {
3097		t.Fatalf("bad: %#v", resp2.Nodes)
3098	}
3099
3100	// Node status update triggers watches
3101	time.AfterFunc(100*time.Millisecond, func() {
3102		errCh <- state.UpdateNodeStatus(structs.MsgTypeTestSetup, 40, node.ID, structs.NodeStatusDown, 0, nil)
3103	})
3104
3105	req.MinQueryIndex = 38
3106	var resp3 structs.NodeListResponse
3107	start = time.Now()
3108	if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp3); err != nil {
3109		t.Fatalf("err: %v", err)
3110	}
3111
3112	if err := <-errCh; err != nil {
3113		t.Fatalf("error from timer: %v", err)
3114	}
3115
3116	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
3117		t.Fatalf("should block (returned in %s) %#v", elapsed, resp3)
3118	}
3119	if resp3.Index != 40 {
3120		t.Fatalf("Bad index: %d %d", resp3.Index, 40)
3121	}
3122	if len(resp3.Nodes) != 1 || resp3.Nodes[0].Status != structs.NodeStatusDown {
3123		t.Fatalf("bad: %#v", resp3.Nodes)
3124	}
3125
3126	// Node delete triggers watches.
3127	time.AfterFunc(100*time.Millisecond, func() {
3128		errCh <- state.DeleteNode(structs.MsgTypeTestSetup, 50, []string{node.ID})
3129	})
3130
3131	req.MinQueryIndex = 45
3132	var resp4 structs.NodeListResponse
3133	start = time.Now()
3134	if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp4); err != nil {
3135		t.Fatalf("err: %v", err)
3136	}
3137
3138	if err := <-errCh; err != nil {
3139		t.Fatalf("error from timer: %v", err)
3140	}
3141
3142	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
3143		t.Fatalf("should block (returned in %s) %#v", elapsed, resp4)
3144	}
3145	if resp4.Index != 50 {
3146		t.Fatalf("Bad index: %d %d", resp4.Index, 50)
3147	}
3148	if len(resp4.Nodes) != 0 {
3149		t.Fatalf("bad: %#v", resp4.Nodes)
3150	}
3151}
3152
3153func TestClientEndpoint_DeriveVaultToken_Bad(t *testing.T) {
3154	t.Parallel()
3155
3156	s1, cleanupS1 := TestServer(t, nil)
3157	defer cleanupS1()
3158	state := s1.fsm.State()
3159	codec := rpcClient(t, s1)
3160	testutil.WaitForLeader(t, s1.RPC)
3161
3162	// Create the node
3163	node := mock.Node()
3164	if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil {
3165		t.Fatalf("err: %v", err)
3166	}
3167
3168	// Create an alloc
3169	alloc := mock.Alloc()
3170	task := alloc.Job.TaskGroups[0].Tasks[0]
3171	tasks := []string{task.Name}
3172	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil {
3173		t.Fatalf("err: %v", err)
3174	}
3175
3176	req := &structs.DeriveVaultTokenRequest{
3177		NodeID:   node.ID,
3178		SecretID: uuid.Generate(),
3179		AllocID:  alloc.ID,
3180		Tasks:    tasks,
3181		QueryOptions: structs.QueryOptions{
3182			Region: "global",
3183		},
3184	}
3185
3186	var resp structs.DeriveVaultTokenResponse
3187	if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil {
3188		t.Fatalf("bad: %v", err)
3189	}
3190
3191	if resp.Error == nil || !strings.Contains(resp.Error.Error(), "SecretID mismatch") {
3192		t.Fatalf("Expected SecretID mismatch: %v", resp.Error)
3193	}
3194
3195	// Put the correct SecretID
3196	req.SecretID = node.SecretID
3197
3198	// Now we should get an error about the allocation not running on the node
3199	if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil {
3200		t.Fatalf("bad: %v", err)
3201	}
3202	if resp.Error == nil || !strings.Contains(resp.Error.Error(), "not running on Node") {
3203		t.Fatalf("Expected not running on node error: %v", resp.Error)
3204	}
3205
3206	// Update to be running on the node
3207	alloc.NodeID = node.ID
3208	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 4, []*structs.Allocation{alloc}); err != nil {
3209		t.Fatalf("err: %v", err)
3210	}
3211
3212	// Now we should get an error about the job not needing any Vault secrets
3213	if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil {
3214		t.Fatalf("bad: %v", err)
3215	}
3216	if resp.Error == nil || !strings.Contains(resp.Error.Error(), "does not require") {
3217		t.Fatalf("Expected no policies error: %v", resp.Error)
3218	}
3219
3220	// Update to be terminal
3221	alloc.DesiredStatus = structs.AllocDesiredStatusStop
3222	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 5, []*structs.Allocation{alloc}); err != nil {
3223		t.Fatalf("err: %v", err)
3224	}
3225
3226	// Now we should get an error about the job not needing any Vault secrets
3227	if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil {
3228		t.Fatalf("bad: %v", err)
3229	}
3230	if resp.Error == nil || !strings.Contains(resp.Error.Error(), "terminal") {
3231		t.Fatalf("Expected terminal allocation error: %v", resp.Error)
3232	}
3233}
3234
3235func TestClientEndpoint_DeriveVaultToken(t *testing.T) {
3236	t.Parallel()
3237
3238	s1, cleanupS1 := TestServer(t, nil)
3239	defer cleanupS1()
3240	state := s1.fsm.State()
3241	codec := rpcClient(t, s1)
3242	testutil.WaitForLeader(t, s1.RPC)
3243
3244	// Enable vault and allow authenticated
3245	tr := true
3246	s1.config.VaultConfig.Enabled = &tr
3247	s1.config.VaultConfig.AllowUnauthenticated = &tr
3248
3249	// Replace the Vault Client on the server
3250	tvc := &TestVaultClient{}
3251	s1.vault = tvc
3252
3253	// Create the node
3254	node := mock.Node()
3255	if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil {
3256		t.Fatalf("err: %v", err)
3257	}
3258
3259	// Create an alloc an allocation that has vault policies required
3260	alloc := mock.Alloc()
3261	alloc.NodeID = node.ID
3262	task := alloc.Job.TaskGroups[0].Tasks[0]
3263	tasks := []string{task.Name}
3264	task.Vault = &structs.Vault{Policies: []string{"a", "b"}}
3265	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil {
3266		t.Fatalf("err: %v", err)
3267	}
3268
3269	// Return a secret for the task
3270	token := uuid.Generate()
3271	accessor := uuid.Generate()
3272	ttl := 10
3273	secret := &vapi.Secret{
3274		WrapInfo: &vapi.SecretWrapInfo{
3275			Token:           token,
3276			WrappedAccessor: accessor,
3277			TTL:             ttl,
3278		},
3279	}
3280	tvc.SetCreateTokenSecret(alloc.ID, task.Name, secret)
3281
3282	req := &structs.DeriveVaultTokenRequest{
3283		NodeID:   node.ID,
3284		SecretID: node.SecretID,
3285		AllocID:  alloc.ID,
3286		Tasks:    tasks,
3287		QueryOptions: structs.QueryOptions{
3288			Region: "global",
3289		},
3290	}
3291
3292	var resp structs.DeriveVaultTokenResponse
3293	if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil {
3294		t.Fatalf("bad: %v", err)
3295	}
3296	if resp.Error != nil {
3297		t.Fatalf("bad: %v", resp.Error)
3298	}
3299
3300	// Check the state store and ensure that we created a VaultAccessor
3301	ws := memdb.NewWatchSet()
3302	va, err := state.VaultAccessor(ws, accessor)
3303	if err != nil {
3304		t.Fatalf("bad: %v", err)
3305	}
3306	if va == nil {
3307		t.Fatalf("bad: %v", va)
3308	}
3309
3310	if va.CreateIndex == 0 {
3311		t.Fatalf("bad: %v", va)
3312	}
3313
3314	va.CreateIndex = 0
3315	expected := &structs.VaultAccessor{
3316		AllocID:     alloc.ID,
3317		Task:        task.Name,
3318		NodeID:      alloc.NodeID,
3319		Accessor:    accessor,
3320		CreationTTL: ttl,
3321	}
3322
3323	if !reflect.DeepEqual(expected, va) {
3324		t.Fatalf("Got %#v; want %#v", va, expected)
3325	}
3326}
3327
3328func TestClientEndpoint_DeriveVaultToken_VaultError(t *testing.T) {
3329	t.Parallel()
3330
3331	s1, cleanupS1 := TestServer(t, nil)
3332	defer cleanupS1()
3333	state := s1.fsm.State()
3334	codec := rpcClient(t, s1)
3335	testutil.WaitForLeader(t, s1.RPC)
3336
3337	// Enable vault and allow authenticated
3338	tr := true
3339	s1.config.VaultConfig.Enabled = &tr
3340	s1.config.VaultConfig.AllowUnauthenticated = &tr
3341
3342	// Replace the Vault Client on the server
3343	tvc := &TestVaultClient{}
3344	s1.vault = tvc
3345
3346	// Create the node
3347	node := mock.Node()
3348	if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil {
3349		t.Fatalf("err: %v", err)
3350	}
3351
3352	// Create an alloc an allocation that has vault policies required
3353	alloc := mock.Alloc()
3354	alloc.NodeID = node.ID
3355	task := alloc.Job.TaskGroups[0].Tasks[0]
3356	tasks := []string{task.Name}
3357	task.Vault = &structs.Vault{Policies: []string{"a", "b"}}
3358	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil {
3359		t.Fatalf("err: %v", err)
3360	}
3361
3362	// Return an error when creating the token
3363	tvc.SetCreateTokenError(alloc.ID, task.Name,
3364		structs.NewRecoverableError(fmt.Errorf("recover"), true))
3365
3366	req := &structs.DeriveVaultTokenRequest{
3367		NodeID:   node.ID,
3368		SecretID: node.SecretID,
3369		AllocID:  alloc.ID,
3370		Tasks:    tasks,
3371		QueryOptions: structs.QueryOptions{
3372			Region: "global",
3373		},
3374	}
3375
3376	var resp structs.DeriveVaultTokenResponse
3377	err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp)
3378	if err != nil {
3379		t.Fatalf("bad: %v", err)
3380	}
3381	if resp.Error == nil || !resp.Error.IsRecoverable() {
3382		t.Fatalf("bad: %+v", resp.Error)
3383	}
3384}
3385
3386func TestClientEndpoint_taskUsesConnect(t *testing.T) {
3387	t.Parallel()
3388
3389	try := func(t *testing.T, task *structs.Task, exp bool) {
3390		result := taskUsesConnect(task)
3391		require.Equal(t, exp, result)
3392	}
3393
3394	t.Run("task uses connect", func(t *testing.T) {
3395		try(t, &structs.Task{
3396			// see nomad.newConnectSidecarTask for how this works
3397			Name: "connect-proxy-myservice",
3398			Kind: "connect-proxy:myservice",
3399		}, true)
3400	})
3401
3402	t.Run("task does not use connect", func(t *testing.T) {
3403		try(t, &structs.Task{
3404			Name: "mytask",
3405			Kind: "incorrect:mytask",
3406		}, false)
3407	})
3408
3409	t.Run("task does not exist", func(t *testing.T) {
3410		try(t, nil, false)
3411	})
3412}
3413
3414func TestClientEndpoint_tasksNotUsingConnect(t *testing.T) {
3415	t.Parallel()
3416
3417	taskGroup := &structs.TaskGroup{
3418		Name: "testgroup",
3419		Tasks: []*structs.Task{{
3420			Name: "connect-proxy-service1",
3421			Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
3422		}, {
3423			Name: "incorrect-task3",
3424			Kind: "incorrect:task3",
3425		}, {
3426			Name: "connect-proxy-service4",
3427			Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service4"),
3428		}, {
3429			Name: "incorrect-task5",
3430			Kind: "incorrect:task5",
3431		}, {
3432			Name: "task6",
3433			Kind: structs.NewTaskKind(structs.ConnectNativePrefix, "service6"),
3434		}},
3435	}
3436
3437	requestingTasks := []string{
3438		"connect-proxy-service1", // yes
3439		"task2",                  // does not exist
3440		"task3",                  // no
3441		"connect-proxy-service4", // yes
3442		"task5",                  // no
3443		"task6",                  // yes, native
3444	}
3445
3446	notConnect, usingConnect := connectTasks(taskGroup, requestingTasks)
3447
3448	notConnectExp := []string{"task2", "task3", "task5"}
3449	usingConnectExp := []connectTask{
3450		{TaskName: "connect-proxy-service1", TaskKind: "connect-proxy:service1"},
3451		{TaskName: "connect-proxy-service4", TaskKind: "connect-proxy:service4"},
3452		{TaskName: "task6", TaskKind: "connect-native:service6"},
3453	}
3454
3455	require.Equal(t, notConnectExp, notConnect)
3456	require.Equal(t, usingConnectExp, usingConnect)
3457}
3458
3459func mutateConnectJob(t *testing.T, job *structs.Job) {
3460	var jch jobConnectHook
3461	_, warnings, err := jch.Mutate(job)
3462	require.Empty(t, warnings)
3463	require.NoError(t, err)
3464}
3465
3466func TestClientEndpoint_DeriveSIToken(t *testing.T) {
3467	t.Parallel()
3468	r := require.New(t)
3469
3470	s1, cleanupS1 := TestServer(t, nil) // already sets consul mocks
3471	defer cleanupS1()
3472
3473	state := s1.fsm.State()
3474	codec := rpcClient(t, s1)
3475	testutil.WaitForLeader(t, s1.RPC)
3476
3477	// Set allow unauthenticated (no operator token required)
3478	s1.config.ConsulConfig.AllowUnauthenticated = helper.BoolToPtr(true)
3479
3480	// Create the node
3481	node := mock.Node()
3482	err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node)
3483	r.NoError(err)
3484
3485	// Create an alloc with a typical connect service (sidecar) defined
3486	alloc := mock.ConnectAlloc()
3487	alloc.NodeID = node.ID
3488	mutateConnectJob(t, alloc.Job) // appends sidecar task
3489	sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
3490
3491	err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc})
3492	r.NoError(err)
3493
3494	request := &structs.DeriveSITokenRequest{
3495		NodeID:       node.ID,
3496		SecretID:     node.SecretID,
3497		AllocID:      alloc.ID,
3498		Tasks:        []string{sidecarTask.Name},
3499		QueryOptions: structs.QueryOptions{Region: "global"},
3500	}
3501
3502	var response structs.DeriveSITokenResponse
3503	err = msgpackrpc.CallWithCodec(codec, "Node.DeriveSIToken", request, &response)
3504	r.NoError(err)
3505	r.Nil(response.Error)
3506
3507	// Check the state store and ensure we created a Consul SI Token Accessor
3508	ws := memdb.NewWatchSet()
3509	accessors, err := state.SITokenAccessorsByNode(ws, node.ID)
3510	r.NoError(err)
3511	r.Equal(1, len(accessors))                                  // only asked for one
3512	r.Equal("connect-proxy-testconnect", accessors[0].TaskName) // set by the mock
3513	r.Equal(node.ID, accessors[0].NodeID)                       // should match
3514	r.Equal(alloc.ID, accessors[0].AllocID)                     // should match
3515	r.True(helper.IsUUID(accessors[0].AccessorID))              // should be set
3516	r.Greater(accessors[0].CreateIndex, uint64(3))              // more than 3rd
3517}
3518
3519func TestClientEndpoint_DeriveSIToken_ConsulError(t *testing.T) {
3520	t.Parallel()
3521	r := require.New(t)
3522
3523	s1, cleanupS1 := TestServer(t, nil)
3524	defer cleanupS1()
3525	state := s1.fsm.State()
3526	codec := rpcClient(t, s1)
3527	testutil.WaitForLeader(t, s1.RPC)
3528
3529	// Set allow unauthenticated (no operator token required)
3530	s1.config.ConsulConfig.AllowUnauthenticated = helper.BoolToPtr(true)
3531
3532	// Create the node
3533	node := mock.Node()
3534	err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node)
3535	r.NoError(err)
3536
3537	// Create an alloc with a typical connect service (sidecar) defined
3538	alloc := mock.ConnectAlloc()
3539	alloc.NodeID = node.ID
3540	mutateConnectJob(t, alloc.Job) // appends sidecar task
3541	sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
3542
3543	// rejigger the server to use a broken mock consul
3544	mockACLsAPI := consul.NewMockACLsAPI(s1.logger)
3545	mockACLsAPI.SetError(structs.NewRecoverableError(errors.New("consul recoverable error"), true))
3546	m := NewConsulACLsAPI(mockACLsAPI, s1.logger, nil)
3547	s1.consulACLs = m
3548
3549	err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc})
3550	r.NoError(err)
3551
3552	request := &structs.DeriveSITokenRequest{
3553		NodeID:       node.ID,
3554		SecretID:     node.SecretID,
3555		AllocID:      alloc.ID,
3556		Tasks:        []string{sidecarTask.Name},
3557		QueryOptions: structs.QueryOptions{Region: "global"},
3558	}
3559
3560	var response structs.DeriveSITokenResponse
3561	err = msgpackrpc.CallWithCodec(codec, "Node.DeriveSIToken", request, &response)
3562	r.NoError(err)
3563	r.NotNil(response.Error)               // error should be set
3564	r.True(response.Error.IsRecoverable()) // and is recoverable
3565}
3566
3567func TestClientEndpoint_EmitEvents(t *testing.T) {
3568	t.Parallel()
3569	require := require.New(t)
3570
3571	s1, cleanupS1 := TestServer(t, nil)
3572	defer cleanupS1()
3573	state := s1.fsm.State()
3574	codec := rpcClient(t, s1)
3575	testutil.WaitForLeader(t, s1.RPC)
3576
3577	// create a node that we can register our event to
3578	node := mock.Node()
3579	err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node)
3580	require.Nil(err)
3581
3582	nodeEvent := &structs.NodeEvent{
3583		Message:   "Registration failed",
3584		Subsystem: "Server",
3585		Timestamp: time.Now(),
3586	}
3587
3588	nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}}
3589	req := structs.EmitNodeEventsRequest{
3590		NodeEvents:   nodeEvents,
3591		WriteRequest: structs.WriteRequest{Region: "global"},
3592	}
3593
3594	var resp structs.GenericResponse
3595	err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp)
3596	require.Nil(err)
3597	require.NotEqual(uint64(0), resp.Index)
3598
3599	// Check for the node in the FSM
3600	ws := memdb.NewWatchSet()
3601	out, err := state.NodeByID(ws, node.ID)
3602	require.Nil(err)
3603	require.False(len(out.Events) < 2)
3604}
3605