1package nomad
2
3import (
4	"reflect"
5	"testing"
6	"time"
7
8	msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
9	"github.com/stretchr/testify/assert"
10	"github.com/stretchr/testify/require"
11
12	"github.com/hashicorp/nomad/acl"
13	"github.com/hashicorp/nomad/helper"
14	"github.com/hashicorp/nomad/helper/uuid"
15	"github.com/hashicorp/nomad/nomad/mock"
16	"github.com/hashicorp/nomad/nomad/structs"
17	"github.com/hashicorp/nomad/testutil"
18)
19
20func TestAllocEndpoint_List(t *testing.T) {
21	t.Parallel()
22
23	s1, cleanupS1 := TestServer(t, nil)
24	defer cleanupS1()
25
26	codec := rpcClient(t, s1)
27	testutil.WaitForLeader(t, s1.RPC)
28
29	// Create the register request
30	alloc := mock.Alloc()
31	summary := mock.JobSummary(alloc.JobID)
32	state := s1.fsm.State()
33
34	if err := state.UpsertJobSummary(999, summary); err != nil {
35		t.Fatalf("err: %v", err)
36	}
37	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}); err != nil {
38		t.Fatalf("err: %v", err)
39	}
40
41	// Lookup the allocations
42	get := &structs.AllocListRequest{
43		QueryOptions: structs.QueryOptions{
44			Region:    "global",
45			Namespace: structs.DefaultNamespace,
46		},
47	}
48	var resp structs.AllocListResponse
49	if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp); err != nil {
50		t.Fatalf("err: %v", err)
51	}
52	if resp.Index != 1000 {
53		t.Fatalf("Bad index: %d %d", resp.Index, 1000)
54	}
55
56	if len(resp.Allocations) != 1 {
57		t.Fatalf("bad: %#v", resp.Allocations)
58	}
59	if resp.Allocations[0].ID != alloc.ID {
60		t.Fatalf("bad: %#v", resp.Allocations[0])
61	}
62
63	// Lookup the allocations by prefix
64	get = &structs.AllocListRequest{
65		QueryOptions: structs.QueryOptions{
66			Region:    "global",
67			Namespace: structs.DefaultNamespace,
68			Prefix:    alloc.ID[:4],
69		},
70	}
71
72	var resp2 structs.AllocListResponse
73	require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp2))
74	require.Equal(t, uint64(1000), resp2.Index)
75	require.Len(t, resp2.Allocations, 1)
76	require.Equal(t, alloc.ID, resp2.Allocations[0].ID)
77}
78
79func TestAllocEndpoint_List_Fields(t *testing.T) {
80	t.Parallel()
81
82	s1, cleanupS1 := TestServer(t, nil)
83	defer cleanupS1()
84
85	codec := rpcClient(t, s1)
86	testutil.WaitForLeader(t, s1.RPC)
87
88	// Create a running alloc
89	alloc := mock.Alloc()
90	alloc.ClientStatus = structs.AllocClientStatusRunning
91	alloc.TaskStates = map[string]*structs.TaskState{
92		"web": {
93			State:     structs.TaskStateRunning,
94			StartedAt: time.Now(),
95		},
96	}
97	summary := mock.JobSummary(alloc.JobID)
98	state := s1.fsm.State()
99
100	require.NoError(t, state.UpsertJobSummary(999, summary))
101	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}))
102
103	cases := []struct {
104		Name   string
105		Fields *structs.AllocStubFields
106		Assert func(t *testing.T, allocs []*structs.AllocListStub)
107	}{
108		{
109			Name:   "None",
110			Fields: nil,
111			Assert: func(t *testing.T, allocs []*structs.AllocListStub) {
112				require.Nil(t, allocs[0].AllocatedResources)
113				require.Len(t, allocs[0].TaskStates, 1)
114			},
115		},
116		{
117			Name:   "Default",
118			Fields: structs.NewAllocStubFields(),
119			Assert: func(t *testing.T, allocs []*structs.AllocListStub) {
120				require.Nil(t, allocs[0].AllocatedResources)
121				require.Len(t, allocs[0].TaskStates, 1)
122			},
123		},
124		{
125			Name: "Resources",
126			Fields: &structs.AllocStubFields{
127				Resources:  true,
128				TaskStates: false,
129			},
130			Assert: func(t *testing.T, allocs []*structs.AllocListStub) {
131				require.NotNil(t, allocs[0].AllocatedResources)
132				require.Len(t, allocs[0].TaskStates, 0)
133			},
134		},
135		{
136			Name: "NoTaskStates",
137			Fields: &structs.AllocStubFields{
138				Resources:  false,
139				TaskStates: false,
140			},
141			Assert: func(t *testing.T, allocs []*structs.AllocListStub) {
142				require.Nil(t, allocs[0].AllocatedResources)
143				require.Len(t, allocs[0].TaskStates, 0)
144			},
145		},
146		{
147			Name: "Both",
148			Fields: &structs.AllocStubFields{
149				Resources:  true,
150				TaskStates: true,
151			},
152			Assert: func(t *testing.T, allocs []*structs.AllocListStub) {
153				require.NotNil(t, allocs[0].AllocatedResources)
154				require.Len(t, allocs[0].TaskStates, 1)
155			},
156		},
157	}
158
159	for i := range cases {
160		tc := cases[i]
161		t.Run(tc.Name, func(t *testing.T) {
162			get := &structs.AllocListRequest{
163				QueryOptions: structs.QueryOptions{
164					Region:    "global",
165					Namespace: structs.DefaultNamespace,
166				},
167				Fields: tc.Fields,
168			}
169			var resp structs.AllocListResponse
170			require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
171			require.Equal(t, uint64(1000), resp.Index)
172			require.Len(t, resp.Allocations, 1)
173			require.Equal(t, alloc.ID, resp.Allocations[0].ID)
174			tc.Assert(t, resp.Allocations)
175		})
176	}
177
178}
179
180func TestAllocEndpoint_List_ACL(t *testing.T) {
181	t.Parallel()
182
183	s1, root, cleanupS1 := TestACLServer(t, nil)
184	defer cleanupS1()
185	codec := rpcClient(t, s1)
186	testutil.WaitForLeader(t, s1.RPC)
187	assert := assert.New(t)
188
189	// Create the alloc
190	alloc := mock.Alloc()
191	allocs := []*structs.Allocation{alloc}
192	summary := mock.JobSummary(alloc.JobID)
193	state := s1.fsm.State()
194
195	assert.Nil(state.UpsertJobSummary(999, summary), "UpsertJobSummary")
196	assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, allocs), "UpsertAllocs")
197
198	stubAllocs := []*structs.AllocListStub{alloc.Stub(nil)}
199	stubAllocs[0].CreateIndex = 1000
200	stubAllocs[0].ModifyIndex = 1000
201
202	// Create the namespace policy and tokens
203	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid",
204		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
205	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid",
206		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
207
208	// Lookup the allocs without a token and expect failure
209	get := &structs.AllocListRequest{
210		QueryOptions: structs.QueryOptions{
211			Region:    "global",
212			Namespace: structs.DefaultNamespace,
213		},
214	}
215	var resp structs.AllocListResponse
216	assert.NotNil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC")
217
218	// Try with a valid token
219	get.AuthToken = validToken.SecretID
220	assert.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC")
221	assert.EqualValues(resp.Index, 1000, "resp.Index")
222	assert.Equal(stubAllocs, resp.Allocations, "Returned alloc list not equal")
223
224	// Try with a invalid token
225	get.AuthToken = invalidToken.SecretID
226	err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)
227	assert.NotNil(err, "RPC")
228	assert.Equal(err.Error(), structs.ErrPermissionDenied.Error())
229
230	// Try with a root token
231	get.AuthToken = root.SecretID
232	assert.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC")
233	assert.EqualValues(resp.Index, 1000, "resp.Index")
234	assert.Equal(stubAllocs, resp.Allocations, "Returned alloc list not equal")
235}
236
237func TestAllocEndpoint_List_Blocking(t *testing.T) {
238	t.Parallel()
239
240	s1, cleanupS1 := TestServer(t, nil)
241	defer cleanupS1()
242	state := s1.fsm.State()
243	codec := rpcClient(t, s1)
244	testutil.WaitForLeader(t, s1.RPC)
245
246	// Create the alloc
247	alloc := mock.Alloc()
248
249	summary := mock.JobSummary(alloc.JobID)
250	if err := state.UpsertJobSummary(1, summary); err != nil {
251		t.Fatalf("err: %v", err)
252	}
253	// Upsert alloc triggers watches
254	time.AfterFunc(100*time.Millisecond, func() {
255		if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil {
256			t.Fatalf("err: %v", err)
257		}
258	})
259
260	req := &structs.AllocListRequest{
261		QueryOptions: structs.QueryOptions{
262			Region:        "global",
263			Namespace:     structs.DefaultNamespace,
264			MinQueryIndex: 1,
265		},
266	}
267	start := time.Now()
268	var resp structs.AllocListResponse
269	if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp); err != nil {
270		t.Fatalf("err: %v", err)
271	}
272
273	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
274		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
275	}
276	if resp.Index != 2 {
277		t.Fatalf("Bad index: %d %d", resp.Index, 2)
278	}
279	if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID {
280		t.Fatalf("bad: %#v", resp.Allocations)
281	}
282
283	// Client updates trigger watches
284	alloc2 := mock.Alloc()
285	alloc2.ID = alloc.ID
286	alloc2.ClientStatus = structs.AllocClientStatusRunning
287	time.AfterFunc(100*time.Millisecond, func() {
288		state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
289		if err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 4, []*structs.Allocation{alloc2}); err != nil {
290			t.Fatalf("err: %v", err)
291		}
292	})
293
294	req.MinQueryIndex = 3
295	start = time.Now()
296	var resp2 structs.AllocListResponse
297	if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
298		t.Fatalf("err: %v", err)
299	}
300
301	if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
302		t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
303	}
304	if resp2.Index != 4 {
305		t.Fatalf("Bad index: %d %d", resp2.Index, 4)
306	}
307	if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
308		resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
309		t.Fatalf("bad: %#v", resp2.Allocations)
310	}
311}
312
313// TestAllocEndpoint_List_AllNamespaces_OSS asserts that server
314// returns all allocations across namespaces.
315func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) {
316	t.Parallel()
317
318	s1, cleanupS1 := TestServer(t, nil)
319	defer cleanupS1()
320	codec := rpcClient(t, s1)
321	testutil.WaitForLeader(t, s1.RPC)
322	state := s1.fsm.State()
323
324	// two namespaces
325	ns1 := mock.Namespace()
326	ns2 := mock.Namespace()
327	require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2}))
328
329	// Create the allocations
330	alloc1 := mock.Alloc()
331	alloc1.ID = "a" + alloc1.ID[1:]
332	alloc1.Namespace = ns1.Name
333	alloc2 := mock.Alloc()
334	alloc2.ID = "b" + alloc2.ID[1:]
335	alloc2.Namespace = ns2.Name
336	summary1 := mock.JobSummary(alloc1.JobID)
337	summary2 := mock.JobSummary(alloc2.JobID)
338
339	require.NoError(t, state.UpsertJobSummary(999, summary1))
340	require.NoError(t, state.UpsertJobSummary(999, summary2))
341	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}))
342
343	t.Run("looking up all allocations", func(t *testing.T) {
344		get := &structs.AllocListRequest{
345			QueryOptions: structs.QueryOptions{
346				Region:    "global",
347				Namespace: "*",
348			},
349		}
350		var resp structs.AllocListResponse
351		require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
352		require.Equal(t, uint64(1000), resp.Index)
353		require.Len(t, resp.Allocations, 2)
354		require.ElementsMatch(t,
355			[]string{resp.Allocations[0].ID, resp.Allocations[1].ID},
356			[]string{alloc1.ID, alloc2.ID})
357	})
358
359	t.Run("looking up allocations with prefix", func(t *testing.T) {
360		get := &structs.AllocListRequest{
361			QueryOptions: structs.QueryOptions{
362				Region:    "global",
363				Namespace: "*",
364				// allocations were constructed above to have non-matching prefix
365				Prefix: alloc1.ID[:4],
366			},
367		}
368		var resp structs.AllocListResponse
369		require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
370		require.Equal(t, uint64(1000), resp.Index)
371		require.Len(t, resp.Allocations, 1)
372		require.Equal(t, alloc1.ID, resp.Allocations[0].ID)
373		require.Equal(t, alloc1.Namespace, resp.Allocations[0].Namespace)
374	})
375
376	t.Run("looking up allocations with mismatch prefix", func(t *testing.T) {
377		// allocations were constructed above to have prefix starting with "a" or "b"
378		badPrefix := "cc"
379
380		get := &structs.AllocListRequest{
381			QueryOptions: structs.QueryOptions{
382				Region:    "global",
383				Namespace: "*",
384				Prefix:    badPrefix,
385			},
386		}
387		var resp structs.AllocListResponse
388		require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
389		require.Equal(t, uint64(1000), resp.Index)
390		require.Empty(t, resp.Allocations)
391	})
392}
393
394func TestAllocEndpoint_GetAlloc(t *testing.T) {
395	t.Parallel()
396
397	s1, cleanupS1 := TestServer(t, nil)
398	defer cleanupS1()
399	codec := rpcClient(t, s1)
400	testutil.WaitForLeader(t, s1.RPC)
401
402	// Create the register request
403	prevAllocID := uuid.Generate()
404	alloc := mock.Alloc()
405	alloc.RescheduleTracker = &structs.RescheduleTracker{
406		Events: []*structs.RescheduleEvent{
407			{RescheduleTime: time.Now().UTC().UnixNano(), PrevNodeID: "boom", PrevAllocID: prevAllocID},
408		},
409	}
410	state := s1.fsm.State()
411	state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
412	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
413	if err != nil {
414		t.Fatalf("err: %v", err)
415	}
416
417	// Lookup the alloc
418	get := &structs.AllocSpecificRequest{
419		AllocID:      alloc.ID,
420		QueryOptions: structs.QueryOptions{Region: "global"},
421	}
422	var resp structs.SingleAllocResponse
423	if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil {
424		t.Fatalf("err: %v", err)
425	}
426	if resp.Index != 1000 {
427		t.Fatalf("Bad index: %d %d", resp.Index, 1000)
428	}
429
430	if !reflect.DeepEqual(alloc, resp.Alloc) {
431		t.Fatalf("bad: %#v", resp.Alloc)
432	}
433}
434
435func TestAllocEndpoint_GetAlloc_ACL(t *testing.T) {
436	t.Parallel()
437
438	s1, root, cleanupS1 := TestACLServer(t, nil)
439	defer cleanupS1()
440	codec := rpcClient(t, s1)
441	testutil.WaitForLeader(t, s1.RPC)
442	assert := assert.New(t)
443
444	// Create the alloc
445	alloc := mock.Alloc()
446	allocs := []*structs.Allocation{alloc}
447	summary := mock.JobSummary(alloc.JobID)
448	state := s1.fsm.State()
449
450	assert.Nil(state.UpsertJobSummary(999, summary), "UpsertJobSummary")
451	assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, allocs), "UpsertAllocs")
452
453	// Create the namespace policy and tokens
454	validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid",
455		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
456	invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid",
457		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
458
459	getReq := func() *structs.AllocSpecificRequest {
460		return &structs.AllocSpecificRequest{
461			AllocID: alloc.ID,
462			QueryOptions: structs.QueryOptions{
463				Region: "global",
464			},
465		}
466	}
467
468	cases := []struct {
469		Name string
470		F    func(t *testing.T)
471	}{
472		// Lookup the alloc without a token and expect failure
473		{
474			Name: "no-token",
475			F: func(t *testing.T) {
476				var resp structs.SingleAllocResponse
477				err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", getReq(), &resp)
478				require.True(t, structs.IsErrUnknownAllocation(err), "expected unknown alloc but found: %v", err)
479			},
480		},
481
482		// Try with a valid ACL token
483		{
484			Name: "valid-token",
485			F: func(t *testing.T) {
486				get := getReq()
487				get.AuthToken = validToken.SecretID
488				get.AllocID = alloc.ID
489				var resp structs.SingleAllocResponse
490				require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC")
491				require.EqualValues(t, resp.Index, 1000, "resp.Index")
492				require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal")
493			},
494		},
495
496		// Try with a valid Node.SecretID
497		{
498			Name: "valid-node-secret",
499			F: func(t *testing.T) {
500				node := mock.Node()
501				assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1005, node))
502				get := getReq()
503				get.AuthToken = node.SecretID
504				get.AllocID = alloc.ID
505				var resp structs.SingleAllocResponse
506				require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC")
507				require.EqualValues(t, resp.Index, 1000, "resp.Index")
508				require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal")
509			},
510		},
511
512		// Try with a invalid token
513		{
514			Name: "invalid-token",
515			F: func(t *testing.T) {
516				get := getReq()
517				get.AuthToken = invalidToken.SecretID
518				get.AllocID = alloc.ID
519				var resp structs.SingleAllocResponse
520				err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp)
521				require.NotNil(t, err, "RPC")
522				require.True(t, structs.IsErrUnknownAllocation(err), "expected unknown alloc but found: %v", err)
523			},
524		},
525
526		// Try with a root token
527		{
528			Name: "root-token",
529			F: func(t *testing.T) {
530				get := getReq()
531				get.AuthToken = root.SecretID
532				get.AllocID = alloc.ID
533				var resp structs.SingleAllocResponse
534				require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC")
535				require.EqualValues(t, resp.Index, 1000, "resp.Index")
536				require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal")
537			},
538		},
539	}
540
541	for _, tc := range cases {
542		t.Run(tc.Name, tc.F)
543	}
544}
545
546func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
547	t.Parallel()
548
549	s1, cleanupS1 := TestServer(t, nil)
550	defer cleanupS1()
551	state := s1.fsm.State()
552	codec := rpcClient(t, s1)
553	testutil.WaitForLeader(t, s1.RPC)
554
555	// Create the allocs
556	alloc1 := mock.Alloc()
557	alloc2 := mock.Alloc()
558
559	// First create an unrelated alloc
560	time.AfterFunc(100*time.Millisecond, func() {
561		state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
562		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1})
563		if err != nil {
564			t.Fatalf("err: %v", err)
565		}
566	})
567
568	// Create the alloc we are watching later
569	time.AfterFunc(200*time.Millisecond, func() {
570		state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
571		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2})
572		if err != nil {
573			t.Fatalf("err: %v", err)
574		}
575	})
576
577	// Lookup the allocs
578	get := &structs.AllocSpecificRequest{
579		AllocID: alloc2.ID,
580		QueryOptions: structs.QueryOptions{
581			Region:        "global",
582			MinQueryIndex: 150,
583		},
584	}
585	var resp structs.SingleAllocResponse
586	start := time.Now()
587	if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil {
588		t.Fatalf("err: %v", err)
589	}
590
591	if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
592		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
593	}
594	if resp.Index != 200 {
595		t.Fatalf("Bad index: %d %d", resp.Index, 200)
596	}
597	if resp.Alloc == nil || resp.Alloc.ID != alloc2.ID {
598		t.Fatalf("bad: %#v", resp.Alloc)
599	}
600}
601
602func TestAllocEndpoint_GetAllocs(t *testing.T) {
603	t.Parallel()
604
605	s1, cleanupS1 := TestServer(t, nil)
606	defer cleanupS1()
607	codec := rpcClient(t, s1)
608	testutil.WaitForLeader(t, s1.RPC)
609
610	// Create the register request
611	alloc := mock.Alloc()
612	alloc2 := mock.Alloc()
613	state := s1.fsm.State()
614	state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
615	state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
616	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2})
617	if err != nil {
618		t.Fatalf("err: %v", err)
619	}
620
621	// Lookup the allocs
622	get := &structs.AllocsGetRequest{
623		AllocIDs: []string{alloc.ID, alloc2.ID},
624		QueryOptions: structs.QueryOptions{
625			Region: "global",
626		},
627	}
628	var resp structs.AllocsGetResponse
629	if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
630		t.Fatalf("err: %v", err)
631	}
632	if resp.Index != 1000 {
633		t.Fatalf("Bad index: %d %d", resp.Index, 1000)
634	}
635
636	if len(resp.Allocs) != 2 {
637		t.Fatalf("bad: %#v", resp.Allocs)
638	}
639
640	// Lookup nonexistent allocs.
641	get = &structs.AllocsGetRequest{
642		AllocIDs:     []string{"foo"},
643		QueryOptions: structs.QueryOptions{Region: "global"},
644	}
645	if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err == nil {
646		t.Fatalf("expect error")
647	}
648}
649
650func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
651	t.Parallel()
652
653	s1, cleanupS1 := TestServer(t, nil)
654	defer cleanupS1()
655	state := s1.fsm.State()
656	codec := rpcClient(t, s1)
657	testutil.WaitForLeader(t, s1.RPC)
658
659	// Create the allocs
660	alloc1 := mock.Alloc()
661	alloc2 := mock.Alloc()
662
663	// First create an unrelated alloc
664	time.AfterFunc(100*time.Millisecond, func() {
665		state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
666		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1})
667		if err != nil {
668			t.Fatalf("err: %v", err)
669		}
670	})
671
672	// Create the alloc we are watching later
673	time.AfterFunc(200*time.Millisecond, func() {
674		state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
675		err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2})
676		if err != nil {
677			t.Fatalf("err: %v", err)
678		}
679	})
680
681	// Lookup the allocs
682	get := &structs.AllocsGetRequest{
683		AllocIDs: []string{alloc1.ID, alloc2.ID},
684		QueryOptions: structs.QueryOptions{
685			Region:        "global",
686			MinQueryIndex: 150,
687		},
688	}
689	var resp structs.AllocsGetResponse
690	start := time.Now()
691	if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
692		t.Fatalf("err: %v", err)
693	}
694
695	if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
696		t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
697	}
698	if resp.Index != 200 {
699		t.Fatalf("Bad index: %d %d", resp.Index, 200)
700	}
701	if len(resp.Allocs) != 2 {
702		t.Fatalf("bad: %#v", resp.Allocs)
703	}
704}
705
706func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) {
707	t.Parallel()
708	require := require.New(t)
709
710	s1, _, cleanupS1 := TestACLServer(t, nil)
711	defer cleanupS1()
712	codec := rpcClient(t, s1)
713	testutil.WaitForLeader(t, s1.RPC)
714
715	// Create the register request
716	alloc := mock.Alloc()
717	alloc2 := mock.Alloc()
718	state := s1.fsm.State()
719	require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)))
720	require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
721	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2}))
722
723	t1 := &structs.DesiredTransition{
724		Migrate: helper.BoolToPtr(true),
725	}
726
727	// Update the allocs desired status
728	get := &structs.AllocUpdateDesiredTransitionRequest{
729		Allocs: map[string]*structs.DesiredTransition{
730			alloc.ID:  t1,
731			alloc2.ID: t1,
732		},
733		Evals: []*structs.Evaluation{
734			{
735				ID:             uuid.Generate(),
736				Namespace:      alloc.Namespace,
737				Priority:       alloc.Job.Priority,
738				Type:           alloc.Job.Type,
739				TriggeredBy:    structs.EvalTriggerNodeDrain,
740				JobID:          alloc.Job.ID,
741				JobModifyIndex: alloc.Job.ModifyIndex,
742				Status:         structs.EvalStatusPending,
743			},
744			{
745				ID:             uuid.Generate(),
746				Namespace:      alloc2.Namespace,
747				Priority:       alloc2.Job.Priority,
748				Type:           alloc2.Job.Type,
749				TriggeredBy:    structs.EvalTriggerNodeDrain,
750				JobID:          alloc2.Job.ID,
751				JobModifyIndex: alloc2.Job.ModifyIndex,
752				Status:         structs.EvalStatusPending,
753			},
754		},
755		WriteRequest: structs.WriteRequest{
756			Region: "global",
757		},
758	}
759
760	// Try without permissions
761	var resp structs.GenericResponse
762	err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp)
763	require.NotNil(err)
764	require.True(structs.IsErrPermissionDenied(err))
765
766	// Try with permissions
767	get.WriteRequest.AuthToken = s1.getLeaderAcl()
768	var resp2 structs.GenericResponse
769	require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp2))
770	require.NotZero(resp2.Index)
771
772	// Look up the allocations
773	out1, err := state.AllocByID(nil, alloc.ID)
774	require.Nil(err)
775	out2, err := state.AllocByID(nil, alloc.ID)
776	require.Nil(err)
777	e1, err := state.EvalByID(nil, get.Evals[0].ID)
778	require.Nil(err)
779	e2, err := state.EvalByID(nil, get.Evals[1].ID)
780	require.Nil(err)
781
782	require.NotNil(out1.DesiredTransition.Migrate)
783	require.NotNil(out2.DesiredTransition.Migrate)
784	require.NotNil(e1)
785	require.NotNil(e2)
786	require.True(*out1.DesiredTransition.Migrate)
787	require.True(*out2.DesiredTransition.Migrate)
788}
789
790func TestAllocEndpoint_Stop_ACL(t *testing.T) {
791	t.Parallel()
792	require := require.New(t)
793
794	s1, _, cleanupS1 := TestACLServer(t, nil)
795	defer cleanupS1()
796	codec := rpcClient(t, s1)
797	testutil.WaitForLeader(t, s1.RPC)
798
799	// Create the register request
800	alloc := mock.Alloc()
801	alloc2 := mock.Alloc()
802	state := s1.fsm.State()
803	require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)))
804	require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
805	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2}))
806
807	req := &structs.AllocStopRequest{
808		AllocID: alloc.ID,
809	}
810	req.Namespace = structs.DefaultNamespace
811	req.Region = alloc.Job.Region
812
813	// Try without permissions
814	var resp structs.AllocStopResponse
815	err := msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp)
816	require.True(structs.IsErrPermissionDenied(err), "expected permissions error, got: %v", err)
817
818	// Try with management permissions
819	req.WriteRequest.AuthToken = s1.getLeaderAcl()
820	var resp2 structs.AllocStopResponse
821	require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp2))
822	require.NotZero(resp2.Index)
823
824	// Try with alloc-lifecycle permissions
825	validToken := mock.CreatePolicyAndToken(t, state, 1002, "valid",
826		mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}))
827	req.WriteRequest.AuthToken = validToken.SecretID
828	req.AllocID = alloc2.ID
829
830	var resp3 structs.AllocStopResponse
831	require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp3))
832	require.NotZero(resp3.Index)
833
834	// Look up the allocations
835	out1, err := state.AllocByID(nil, alloc.ID)
836	require.Nil(err)
837	out2, err := state.AllocByID(nil, alloc2.ID)
838	require.Nil(err)
839	e1, err := state.EvalByID(nil, resp2.EvalID)
840	require.Nil(err)
841	e2, err := state.EvalByID(nil, resp3.EvalID)
842	require.Nil(err)
843
844	require.NotNil(out1.DesiredTransition.Migrate)
845	require.NotNil(out2.DesiredTransition.Migrate)
846	require.NotNil(e1)
847	require.NotNil(e2)
848	require.True(*out1.DesiredTransition.Migrate)
849	require.True(*out2.DesiredTransition.Migrate)
850}
851
852func TestAllocEndpoint_List_AllNamespaces_ACL_OSS(t *testing.T) {
853	t.Parallel()
854
855	s1, root, cleanupS1 := TestACLServer(t, nil)
856	defer cleanupS1()
857	codec := rpcClient(t, s1)
858	testutil.WaitForLeader(t, s1.RPC)
859	state := s1.fsm.State()
860
861	// two namespaces
862	ns1 := mock.Namespace()
863	ns2 := mock.Namespace()
864	require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2}))
865
866	// Create the allocations
867	alloc1 := mock.Alloc()
868	alloc1.ID = "a" + alloc1.ID[1:]
869	alloc1.Namespace = ns1.Name
870	alloc2 := mock.Alloc()
871	alloc2.ID = "b" + alloc2.ID[1:]
872	alloc2.Namespace = ns2.Name
873	summary1 := mock.JobSummary(alloc1.JobID)
874	summary2 := mock.JobSummary(alloc2.JobID)
875
876	require.NoError(t, state.UpsertJobSummary(999, summary1))
877	require.NoError(t, state.UpsertJobSummary(999, summary2))
878	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}))
879	alloc1.CreateIndex = 1000
880	alloc1.ModifyIndex = 1000
881	alloc2.CreateIndex = 1000
882	alloc2.ModifyIndex = 1000
883
884	everythingButReadJob := []string{
885		acl.NamespaceCapabilityDeny,
886		acl.NamespaceCapabilityListJobs,
887		// acl.NamespaceCapabilityReadJob,
888		acl.NamespaceCapabilitySubmitJob,
889		acl.NamespaceCapabilityDispatchJob,
890		acl.NamespaceCapabilityReadLogs,
891		acl.NamespaceCapabilityReadFS,
892		acl.NamespaceCapabilityAllocExec,
893		acl.NamespaceCapabilityAllocNodeExec,
894		acl.NamespaceCapabilityAllocLifecycle,
895		acl.NamespaceCapabilitySentinelOverride,
896		acl.NamespaceCapabilityCSIRegisterPlugin,
897		acl.NamespaceCapabilityCSIWriteVolume,
898		acl.NamespaceCapabilityCSIReadVolume,
899		acl.NamespaceCapabilityCSIListVolume,
900		acl.NamespaceCapabilityCSIMountVolume,
901		acl.NamespaceCapabilityListScalingPolicies,
902		acl.NamespaceCapabilityReadScalingPolicy,
903		acl.NamespaceCapabilityReadJobScaling,
904		acl.NamespaceCapabilityScaleJob,
905		acl.NamespaceCapabilitySubmitRecommendation,
906	}
907
908	ns1token := mock.CreatePolicyAndToken(t, state, 1001, "ns1",
909		mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}))
910	ns1tokenInsufficient := mock.CreatePolicyAndToken(t, state, 1001, "ns1-insufficient",
911		mock.NamespacePolicy(ns1.Name, "", everythingButReadJob))
912	ns2token := mock.CreatePolicyAndToken(t, state, 1001, "ns2",
913		mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob}))
914	bothToken := mock.CreatePolicyAndToken(t, state, 1001, "nsBoth",
915		mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})+
916			mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob}))
917
918	cases := []struct {
919		Label     string
920		Namespace string
921		Token     string
922		Allocs    []*structs.Allocation
923		Error     bool
924		Message   string
925		Prefix    string
926	}{
927		{
928			Label:     "all namespaces with sufficient token",
929			Namespace: "*",
930			Token:     bothToken.SecretID,
931			Allocs:    []*structs.Allocation{alloc1, alloc2},
932		},
933		{
934			Label:     "all namespaces with root token",
935			Namespace: "*",
936			Token:     root.SecretID,
937			Allocs:    []*structs.Allocation{alloc1, alloc2},
938		},
939		{
940			Label:     "all namespaces with ns1 token",
941			Namespace: "*",
942			Token:     ns1token.SecretID,
943			Allocs:    []*structs.Allocation{alloc1},
944		},
945		{
946			Label:     "all namespaces with ns2 token",
947			Namespace: "*",
948			Token:     ns2token.SecretID,
949			Allocs:    []*structs.Allocation{alloc2},
950		},
951		{
952			Label:     "all namespaces with bad token",
953			Namespace: "*",
954			Token:     uuid.Generate(),
955			Error:     true,
956			Message:   structs.ErrTokenNotFound.Error(),
957		},
958		{
959			Label:     "all namespaces with insufficient token",
960			Namespace: "*",
961			Allocs:    []*structs.Allocation{},
962			Token:     ns1tokenInsufficient.SecretID,
963		},
964		{
965			Label:     "ns1 with ns1 token",
966			Namespace: ns1.Name,
967			Token:     ns1token.SecretID,
968			Allocs:    []*structs.Allocation{alloc1},
969		},
970		{
971			Label:     "ns1 with root token",
972			Namespace: ns1.Name,
973			Token:     root.SecretID,
974			Allocs:    []*structs.Allocation{alloc1},
975		},
976		{
977			Label:     "ns1 with ns2 token",
978			Namespace: ns1.Name,
979			Token:     ns2token.SecretID,
980			Error:     true,
981		},
982		{
983			Label:     "ns1 with invalid token",
984			Namespace: ns1.Name,
985			Token:     uuid.Generate(),
986			Error:     true,
987			Message:   structs.ErrTokenNotFound.Error(),
988		},
989		{
990			Label:     "bad namespace with root token",
991			Namespace: uuid.Generate(),
992			Token:     root.SecretID,
993			Allocs:    []*structs.Allocation{},
994		},
995		{
996			Label:     "all namespaces with prefix",
997			Namespace: "*",
998			Prefix:    alloc1.ID[:2],
999			Token:     root.SecretID,
1000			Allocs:    []*structs.Allocation{alloc1},
1001		},
1002	}
1003
1004	for _, tc := range cases {
1005		t.Run(tc.Label, func(t *testing.T) {
1006
1007			get := &structs.AllocListRequest{
1008				QueryOptions: structs.QueryOptions{
1009					Region:    "global",
1010					Namespace: tc.Namespace,
1011					Prefix:    tc.Prefix,
1012					AuthToken: tc.Token,
1013				},
1014			}
1015			var resp structs.AllocListResponse
1016			err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)
1017			if tc.Error {
1018				require.Error(t, err)
1019				if tc.Message != "" {
1020					require.Equal(t, err.Error(), tc.Message)
1021				} else {
1022					require.Equal(t, err.Error(), structs.ErrPermissionDenied.Error())
1023				}
1024			} else {
1025				require.NoError(t, err)
1026				require.Equal(t, uint64(1000), resp.Index)
1027				exp := make([]*structs.AllocListStub, len(tc.Allocs))
1028				for i, a := range tc.Allocs {
1029					exp[i] = a.Stub(nil)
1030				}
1031				require.ElementsMatch(t, exp, resp.Allocations)
1032			}
1033		})
1034	}
1035
1036}
1037