1package client
2
3import (
4	"encoding/json"
5	"fmt"
6	"io"
7	"net"
8	"runtime"
9	"strings"
10	"testing"
11	"time"
12
13	"github.com/hashicorp/nomad/acl"
14	"github.com/hashicorp/nomad/client/config"
15	cstructs "github.com/hashicorp/nomad/client/structs"
16	"github.com/hashicorp/nomad/helper/pluginutils/catalog"
17	"github.com/hashicorp/nomad/helper/uuid"
18	"github.com/hashicorp/nomad/nomad"
19	"github.com/hashicorp/nomad/nomad/mock"
20	"github.com/hashicorp/nomad/nomad/structs"
21	nstructs "github.com/hashicorp/nomad/nomad/structs"
22	nconfig "github.com/hashicorp/nomad/nomad/structs/config"
23	"github.com/hashicorp/nomad/plugins/drivers"
24	"github.com/hashicorp/nomad/testutil"
25	"github.com/stretchr/testify/require"
26	"github.com/ugorji/go/codec"
27	"golang.org/x/sys/unix"
28)
29
30func TestAllocations_Restart(t *testing.T) {
31	t.Parallel()
32	require := require.New(t)
33	client, cleanup := TestClient(t, nil)
34	defer cleanup()
35
36	a := mock.Alloc()
37	a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
38	a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{
39		Attempts: 0,
40		Mode:     nstructs.RestartPolicyModeFail,
41	}
42	a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
43		"run_for": "10ms",
44	}
45	require.Nil(client.addAlloc(a, ""))
46
47	// Try with bad alloc
48	req := &nstructs.AllocRestartRequest{}
49	var resp nstructs.GenericResponse
50	err := client.ClientRPC("Allocations.Restart", &req, &resp)
51	require.Error(err)
52
53	// Try with good alloc
54	req.AllocID = a.ID
55
56	testutil.WaitForResult(func() (bool, error) {
57		var resp2 nstructs.GenericResponse
58		err := client.ClientRPC("Allocations.Restart", &req, &resp2)
59		if err != nil && strings.Contains(err.Error(), "not running") {
60			return false, err
61		}
62
63		return true, nil
64	}, func(err error) {
65		t.Fatalf("err: %v", err)
66	})
67}
68
69func TestAllocations_Restart_ACL(t *testing.T) {
70	t.Parallel()
71	require := require.New(t)
72
73	server, addr, root, cleanupS := testACLServer(t, nil)
74	defer cleanupS()
75
76	client, cleanupC := TestClient(t, func(c *config.Config) {
77		c.Servers = []string{addr}
78		c.ACLEnabled = true
79	})
80	defer cleanupC()
81
82	job := mock.BatchJob()
83	job.TaskGroups[0].Count = 1
84	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
85		"run_for": "20s",
86	}
87
88	// Wait for client to be running job
89	alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0]
90
91	// Try request without a token and expect failure
92	{
93		req := &nstructs.AllocRestartRequest{}
94		req.AllocID = alloc.ID
95		var resp nstructs.GenericResponse
96		err := client.ClientRPC("Allocations.Restart", &req, &resp)
97		require.NotNil(err)
98		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
99	}
100
101	// Try request with an invalid token and expect failure
102	{
103		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{}))
104		req := &nstructs.AllocRestartRequest{}
105		req.AllocID = alloc.ID
106		req.AuthToken = token.SecretID
107
108		var resp nstructs.GenericResponse
109		err := client.ClientRPC("Allocations.Restart", &req, &resp)
110
111		require.NotNil(err)
112		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
113	}
114
115	// Try request with a valid token
116	{
117		policyHCL := mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})
118		token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", policyHCL)
119		require.NotNil(token)
120		req := &nstructs.AllocRestartRequest{}
121		req.AllocID = alloc.ID
122		req.AuthToken = token.SecretID
123		req.Namespace = nstructs.DefaultNamespace
124		var resp nstructs.GenericResponse
125		err := client.ClientRPC("Allocations.Restart", &req, &resp)
126		require.NoError(err)
127		//require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err)
128	}
129
130	// Try request with a management token
131	{
132		req := &nstructs.AllocRestartRequest{}
133		req.AllocID = alloc.ID
134		req.AuthToken = root.SecretID
135		var resp nstructs.GenericResponse
136		err := client.ClientRPC("Allocations.Restart", &req, &resp)
137		// Depending on how quickly the alloc restarts there may be no
138		// error *or* a task not running error; either is fine.
139		if err != nil {
140			require.Contains(err.Error(), "Task not running", err)
141		}
142	}
143}
144
145func TestAllocations_GarbageCollectAll(t *testing.T) {
146	t.Parallel()
147	require := require.New(t)
148	client, cleanup := TestClient(t, nil)
149	defer cleanup()
150
151	req := &nstructs.NodeSpecificRequest{}
152	var resp nstructs.GenericResponse
153	require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
154}
155
156func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
157	t.Parallel()
158	require := require.New(t)
159
160	server, addr, root, cleanupS := testACLServer(t, nil)
161	defer cleanupS()
162
163	client, cleanupC := TestClient(t, func(c *config.Config) {
164		c.Servers = []string{addr}
165		c.ACLEnabled = true
166	})
167	defer cleanupC()
168
169	// Try request without a token and expect failure
170	{
171		req := &nstructs.NodeSpecificRequest{}
172		var resp nstructs.GenericResponse
173		err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)
174		require.NotNil(err)
175		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
176	}
177
178	// Try request with an invalid token and expect failure
179	{
180		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
181		req := &nstructs.NodeSpecificRequest{}
182		req.AuthToken = token.SecretID
183
184		var resp nstructs.GenericResponse
185		err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)
186
187		require.NotNil(err)
188		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
189	}
190
191	// Try request with a valid token
192	{
193		token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyWrite))
194		req := &nstructs.NodeSpecificRequest{}
195		req.AuthToken = token.SecretID
196		var resp nstructs.GenericResponse
197		require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
198	}
199
200	// Try request with a management token
201	{
202		req := &nstructs.NodeSpecificRequest{}
203		req.AuthToken = root.SecretID
204		var resp nstructs.GenericResponse
205		require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp))
206	}
207}
208
209func TestAllocations_GarbageCollect(t *testing.T) {
210	t.Parallel()
211	require := require.New(t)
212	client, cleanup := TestClient(t, func(c *config.Config) {
213		c.GCDiskUsageThreshold = 100.0
214	})
215	defer cleanup()
216
217	a := mock.Alloc()
218	a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
219	a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{
220		Attempts: 0,
221		Mode:     nstructs.RestartPolicyModeFail,
222	}
223	a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
224		"run_for": "10ms",
225	}
226	require.Nil(client.addAlloc(a, ""))
227
228	// Try with bad alloc
229	req := &nstructs.AllocSpecificRequest{}
230	var resp nstructs.GenericResponse
231	err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
232	require.NotNil(err)
233
234	// Try with good alloc
235	req.AllocID = a.ID
236	testutil.WaitForResult(func() (bool, error) {
237		// Check if has been removed first
238		if ar, ok := client.allocs[a.ID]; !ok || ar.IsDestroyed() {
239			return true, nil
240		}
241
242		var resp2 nstructs.GenericResponse
243		err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp2)
244		return err == nil, err
245	}, func(err error) {
246		t.Fatalf("err: %v", err)
247	})
248}
249
250func TestAllocations_GarbageCollect_ACL(t *testing.T) {
251	t.Parallel()
252	require := require.New(t)
253
254	server, addr, root, cleanupS := testACLServer(t, nil)
255	defer cleanupS()
256
257	client, cleanupC := TestClient(t, func(c *config.Config) {
258		c.Servers = []string{addr}
259		c.ACLEnabled = true
260	})
261	defer cleanupC()
262
263	job := mock.BatchJob()
264	job.TaskGroups[0].Count = 1
265	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
266		"run_for": "20s",
267	}
268
269	// Wait for client to be running job
270	alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0]
271
272	// Try request without a token and expect failure
273	{
274		req := &nstructs.AllocSpecificRequest{}
275		req.AllocID = alloc.ID
276		var resp nstructs.GenericResponse
277		err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
278		require.NotNil(err)
279		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
280	}
281
282	// Try request with an invalid token and expect failure
283	{
284		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
285		req := &nstructs.AllocSpecificRequest{}
286		req.AllocID = alloc.ID
287		req.AuthToken = token.SecretID
288
289		var resp nstructs.GenericResponse
290		err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
291
292		require.NotNil(err)
293		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
294	}
295
296	// Try request with a valid token
297	{
298		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
299			mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
300		req := &nstructs.AllocSpecificRequest{}
301		req.AllocID = alloc.ID
302		req.AuthToken = token.SecretID
303		req.Namespace = nstructs.DefaultNamespace
304
305		var resp nstructs.GenericResponse
306		err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
307		require.True(nstructs.IsErrUnknownAllocation(err))
308	}
309
310	// Try request with a management token
311	{
312		req := &nstructs.AllocSpecificRequest{}
313		req.AuthToken = root.SecretID
314
315		var resp nstructs.GenericResponse
316		err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp)
317		require.True(nstructs.IsErrUnknownAllocation(err))
318	}
319}
320
321func TestAllocations_Signal(t *testing.T) {
322	t.Parallel()
323
324	client, cleanup := TestClient(t, nil)
325	defer cleanup()
326
327	a := mock.Alloc()
328	require.Nil(t, client.addAlloc(a, ""))
329
330	// Try with bad alloc
331	req := &nstructs.AllocSignalRequest{}
332	var resp nstructs.GenericResponse
333	err := client.ClientRPC("Allocations.Signal", &req, &resp)
334	require.NotNil(t, err)
335	require.True(t, nstructs.IsErrUnknownAllocation(err))
336
337	// Try with good alloc
338	req.AllocID = a.ID
339
340	var resp2 nstructs.GenericResponse
341	err = client.ClientRPC("Allocations.Signal", &req, &resp2)
342
343	require.Error(t, err, "Expected error, got: %s, resp: %#+v", err, resp2)
344	require.Contains(t, err.Error(), "Failed to signal task: web, err: Task not running")
345}
346
347func TestAllocations_Signal_ACL(t *testing.T) {
348	t.Parallel()
349	require := require.New(t)
350
351	server, addr, root, cleanupS := testACLServer(t, nil)
352	defer cleanupS()
353
354	client, cleanupC := TestClient(t, func(c *config.Config) {
355		c.Servers = []string{addr}
356		c.ACLEnabled = true
357	})
358	defer cleanupC()
359
360	job := mock.BatchJob()
361	job.TaskGroups[0].Count = 1
362	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
363		"run_for": "20s",
364	}
365
366	// Wait for client to be running job
367	alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0]
368
369	// Try request without a token and expect failure
370	{
371		req := &nstructs.AllocSignalRequest{}
372		req.AllocID = alloc.ID
373		var resp nstructs.GenericResponse
374		err := client.ClientRPC("Allocations.Signal", &req, &resp)
375		require.NotNil(err)
376		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
377	}
378
379	// Try request with an invalid token and expect failure
380	{
381		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
382		req := &nstructs.AllocSignalRequest{}
383		req.AllocID = alloc.ID
384		req.AuthToken = token.SecretID
385
386		var resp nstructs.GenericResponse
387		err := client.ClientRPC("Allocations.Signal", &req, &resp)
388
389		require.NotNil(err)
390		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
391	}
392
393	// Try request with a valid token
394	{
395		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
396			mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}))
397		req := &nstructs.AllocSignalRequest{}
398		req.AllocID = alloc.ID
399		req.AuthToken = token.SecretID
400		req.Namespace = nstructs.DefaultNamespace
401
402		var resp nstructs.GenericResponse
403		err := client.ClientRPC("Allocations.Signal", &req, &resp)
404		require.NoError(err)
405	}
406
407	// Try request with a management token
408	{
409		req := &nstructs.AllocSignalRequest{}
410		req.AllocID = alloc.ID
411		req.AuthToken = root.SecretID
412
413		var resp nstructs.GenericResponse
414		err := client.ClientRPC("Allocations.Signal", &req, &resp)
415		require.NoError(err)
416	}
417}
418
419func TestAllocations_Stats(t *testing.T) {
420	t.Parallel()
421	require := require.New(t)
422	client, cleanup := TestClient(t, nil)
423	defer cleanup()
424
425	a := mock.Alloc()
426	require.Nil(client.addAlloc(a, ""))
427
428	// Try with bad alloc
429	req := &cstructs.AllocStatsRequest{}
430	var resp cstructs.AllocStatsResponse
431	err := client.ClientRPC("Allocations.Stats", &req, &resp)
432	require.NotNil(err)
433
434	// Try with good alloc
435	req.AllocID = a.ID
436	testutil.WaitForResult(func() (bool, error) {
437		var resp2 cstructs.AllocStatsResponse
438		err := client.ClientRPC("Allocations.Stats", &req, &resp2)
439		if err != nil {
440			return false, err
441		}
442		if resp2.Stats == nil {
443			return false, fmt.Errorf("invalid stats object")
444		}
445
446		return true, nil
447	}, func(err error) {
448		t.Fatalf("err: %v", err)
449	})
450}
451
452func TestAllocations_Stats_ACL(t *testing.T) {
453	t.Parallel()
454	require := require.New(t)
455
456	server, addr, root, cleanupS := testACLServer(t, nil)
457	defer cleanupS()
458
459	client, cleanupC := TestClient(t, func(c *config.Config) {
460		c.Servers = []string{addr}
461		c.ACLEnabled = true
462	})
463	defer cleanupC()
464
465	job := mock.BatchJob()
466	job.TaskGroups[0].Count = 1
467	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
468		"run_for": "20s",
469	}
470
471	// Wait for client to be running job
472	alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0]
473
474	// Try request without a token and expect failure
475	{
476		req := &cstructs.AllocStatsRequest{}
477		req.AllocID = alloc.ID
478		var resp cstructs.AllocStatsResponse
479		err := client.ClientRPC("Allocations.Stats", &req, &resp)
480		require.NotNil(err)
481		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
482	}
483
484	// Try request with an invalid token and expect failure
485	{
486		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
487		req := &cstructs.AllocStatsRequest{}
488		req.AllocID = alloc.ID
489		req.AuthToken = token.SecretID
490
491		var resp cstructs.AllocStatsResponse
492		err := client.ClientRPC("Allocations.Stats", &req, &resp)
493
494		require.NotNil(err)
495		require.EqualError(err, nstructs.ErrPermissionDenied.Error())
496	}
497
498	// Try request with a valid token
499	{
500		token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
501			mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
502		req := &cstructs.AllocStatsRequest{}
503		req.AllocID = alloc.ID
504		req.AuthToken = token.SecretID
505		req.Namespace = nstructs.DefaultNamespace
506
507		var resp cstructs.AllocStatsResponse
508		err := client.ClientRPC("Allocations.Stats", &req, &resp)
509		require.NoError(err)
510	}
511
512	// Try request with a management token
513	{
514		req := &cstructs.AllocStatsRequest{}
515		req.AllocID = alloc.ID
516		req.AuthToken = root.SecretID
517
518		var resp cstructs.AllocStatsResponse
519		err := client.ClientRPC("Allocations.Stats", &req, &resp)
520		require.NoError(err)
521	}
522}
523
524func TestAlloc_ExecStreaming(t *testing.T) {
525	t.Parallel()
526	require := require.New(t)
527
528	// Start a server and client
529	s, cleanupS := nomad.TestServer(t, nil)
530	defer cleanupS()
531	testutil.WaitForLeader(t, s.RPC)
532
533	c, cleanupC := TestClient(t, func(c *config.Config) {
534		c.Servers = []string{s.GetConfig().RPCAddr.String()}
535	})
536	defer cleanupC()
537
538	expectedStdout := "Hello from the other side\n"
539	expectedStderr := "Hello from the other side\n"
540	job := mock.BatchJob()
541	job.TaskGroups[0].Count = 1
542	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
543		"run_for": "20s",
544		"exec_command": map[string]interface{}{
545			"run_for":       "1ms",
546			"stdout_string": expectedStdout,
547			"stderr_string": expectedStderr,
548			"exit_code":     3,
549		},
550	}
551
552	// Wait for client to be running job
553	testutil.WaitForRunning(t, s.RPC, job)
554
555	// Get the allocation ID
556	args := nstructs.AllocListRequest{}
557	args.Region = "global"
558	resp := nstructs.AllocListResponse{}
559	require.NoError(s.RPC("Alloc.List", &args, &resp))
560	require.Len(resp.Allocations, 1)
561	allocID := resp.Allocations[0].ID
562
563	// Make the request
564	req := &cstructs.AllocExecRequest{
565		AllocID:      allocID,
566		Task:         job.TaskGroups[0].Tasks[0].Name,
567		Tty:          true,
568		Cmd:          []string{"placeholder command"},
569		QueryOptions: nstructs.QueryOptions{Region: "global"},
570	}
571
572	// Get the handler
573	handler, err := c.StreamingRpcHandler("Allocations.Exec")
574	require.Nil(err)
575
576	// Create a pipe
577	p1, p2 := net.Pipe()
578	defer p1.Close()
579	defer p2.Close()
580
581	errCh := make(chan error)
582	frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
583
584	// Start the handler
585	go handler(p2)
586	go decodeFrames(t, p1, frames, errCh)
587
588	// Send the request
589	encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
590	require.Nil(encoder.Encode(req))
591
592	timeout := time.After(3 * time.Second)
593
594	exitCode := -1
595	receivedStdout := ""
596	receivedStderr := ""
597
598OUTER:
599	for {
600		select {
601		case <-timeout:
602			// time out report
603			require.Equal(expectedStdout, receivedStderr, "didn't receive expected stdout")
604			require.Equal(expectedStderr, receivedStderr, "didn't receive expected stderr")
605			require.Equal(3, exitCode, "failed to get exit code")
606			require.FailNow("timed out")
607		case err := <-errCh:
608			require.NoError(err)
609		case f := <-frames:
610			switch {
611			case f.Stdout != nil && len(f.Stdout.Data) != 0:
612				receivedStdout += string(f.Stdout.Data)
613			case f.Stderr != nil && len(f.Stderr.Data) != 0:
614				receivedStderr += string(f.Stderr.Data)
615			case f.Exited && f.Result != nil:
616				exitCode = int(f.Result.ExitCode)
617			default:
618				t.Logf("received unrelevant frame: %v", f)
619			}
620
621			if expectedStdout == receivedStdout && expectedStderr == receivedStderr && exitCode == 3 {
622				break OUTER
623			}
624		}
625	}
626}
627
628func TestAlloc_ExecStreaming_NoAllocation(t *testing.T) {
629	t.Parallel()
630	require := require.New(t)
631
632	// Start a server and client
633	s, cleanupS := nomad.TestServer(t, nil)
634	defer cleanupS()
635	testutil.WaitForLeader(t, s.RPC)
636
637	c, cleanupC := TestClient(t, func(c *config.Config) {
638		c.Servers = []string{s.GetConfig().RPCAddr.String()}
639	})
640	defer cleanupC()
641
642	// Make the request
643	req := &cstructs.AllocExecRequest{
644		AllocID:      uuid.Generate(),
645		Task:         "testtask",
646		Tty:          true,
647		Cmd:          []string{"placeholder command"},
648		QueryOptions: nstructs.QueryOptions{Region: "global"},
649	}
650
651	// Get the handler
652	handler, err := c.StreamingRpcHandler("Allocations.Exec")
653	require.Nil(err)
654
655	// Create a pipe
656	p1, p2 := net.Pipe()
657	defer p1.Close()
658	defer p2.Close()
659
660	errCh := make(chan error)
661	frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
662
663	// Start the handler
664	go handler(p2)
665	go decodeFrames(t, p1, frames, errCh)
666
667	// Send the request
668	encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
669	require.Nil(encoder.Encode(req))
670
671	timeout := time.After(3 * time.Second)
672
673	select {
674	case <-timeout:
675		require.FailNow("timed out")
676	case err := <-errCh:
677		require.True(nstructs.IsErrUnknownAllocation(err), "expected no allocation error but found: %v", err)
678	case f := <-frames:
679		require.Fail("received unexpected frame", "frame: %#v", f)
680	}
681}
682
683func TestAlloc_ExecStreaming_DisableRemoteExec(t *testing.T) {
684	t.Parallel()
685	require := require.New(t)
686
687	// Start a server and client
688	s, cleanupS := nomad.TestServer(t, nil)
689	defer cleanupS()
690	testutil.WaitForLeader(t, s.RPC)
691
692	c, cleanupC := TestClient(t, func(c *config.Config) {
693		c.Servers = []string{s.GetConfig().RPCAddr.String()}
694		c.DisableRemoteExec = true
695	})
696	defer cleanupC()
697
698	// Make the request
699	req := &cstructs.AllocExecRequest{
700		AllocID:      uuid.Generate(),
701		Task:         "testtask",
702		Tty:          true,
703		Cmd:          []string{"placeholder command"},
704		QueryOptions: nstructs.QueryOptions{Region: "global"},
705	}
706
707	// Get the handler
708	handler, err := c.StreamingRpcHandler("Allocations.Exec")
709	require.Nil(err)
710
711	// Create a pipe
712	p1, p2 := net.Pipe()
713	defer p1.Close()
714	defer p2.Close()
715
716	errCh := make(chan error)
717	frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
718
719	// Start the handler
720	go handler(p2)
721	go decodeFrames(t, p1, frames, errCh)
722
723	// Send the request
724	encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
725	require.Nil(encoder.Encode(req))
726
727	timeout := time.After(3 * time.Second)
728
729	select {
730	case <-timeout:
731		require.FailNow("timed out")
732	case err := <-errCh:
733		require.True(nstructs.IsErrPermissionDenied(err), "expected permission denied error but found: %v", err)
734	case f := <-frames:
735		require.Fail("received unexpected frame", "frame: %#v", f)
736	}
737}
738
739func TestAlloc_ExecStreaming_ACL_Basic(t *testing.T) {
740	t.Parallel()
741
742	// Start a server and client
743	s, root, cleanupS := nomad.TestACLServer(t, nil)
744	defer cleanupS()
745	testutil.WaitForLeader(t, s.RPC)
746
747	client, cleanupC := TestClient(t, func(c *config.Config) {
748		c.ACLEnabled = true
749		c.Servers = []string{s.GetConfig().RPCAddr.String()}
750	})
751	defer cleanupC()
752
753	// Create a bad token
754	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
755	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
756
757	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
758		[]string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityReadFS})
759	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
760
761	job := mock.BatchJob()
762	job.TaskGroups[0].Count = 1
763	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
764		"run_for": "20s",
765	}
766
767	// Wait for client to be running job
768	alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
769
770	cases := []struct {
771		Name          string
772		Token         string
773		ExpectedError string
774	}{
775		{
776			Name:          "bad token",
777			Token:         tokenBad.SecretID,
778			ExpectedError: structs.ErrPermissionDenied.Error(),
779		},
780		{
781			Name:          "good token",
782			Token:         tokenGood.SecretID,
783			ExpectedError: "task not found",
784		},
785		{
786			Name:          "root token",
787			Token:         root.SecretID,
788			ExpectedError: "task not found",
789		},
790	}
791
792	for _, c := range cases {
793		t.Run(c.Name, func(t *testing.T) {
794
795			// Make the request
796			req := &cstructs.AllocExecRequest{
797				AllocID: alloc.ID,
798				Task:    "testtask",
799				Tty:     true,
800				Cmd:     []string{"placeholder command"},
801				QueryOptions: nstructs.QueryOptions{
802					Region:    "global",
803					AuthToken: c.Token,
804					Namespace: nstructs.DefaultNamespace,
805				},
806			}
807
808			// Get the handler
809			handler, err := client.StreamingRpcHandler("Allocations.Exec")
810			require.Nil(t, err)
811
812			// Create a pipe
813			p1, p2 := net.Pipe()
814			defer p1.Close()
815			defer p2.Close()
816
817			errCh := make(chan error)
818			frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
819
820			// Start the handler
821			go handler(p2)
822			go decodeFrames(t, p1, frames, errCh)
823
824			// Send the request
825			encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
826			require.Nil(t, encoder.Encode(req))
827
828			select {
829			case <-time.After(3 * time.Second):
830				require.FailNow(t, "timed out")
831			case err := <-errCh:
832				require.Contains(t, err.Error(), c.ExpectedError)
833			case f := <-frames:
834				require.Fail(t, "received unexpected frame", "frame: %#v", f)
835			}
836		})
837	}
838}
839
840// TestAlloc_ExecStreaming_ACL_WithIsolation_Image asserts that token only needs
841// alloc-exec acl policy when image isolation is used
842func TestAlloc_ExecStreaming_ACL_WithIsolation_Image(t *testing.T) {
843	t.Parallel()
844	isolation := drivers.FSIsolationImage
845
846	// Start a server and client
847	s, root, cleanupS := nomad.TestACLServer(t, nil)
848	defer cleanupS()
849	testutil.WaitForLeader(t, s.RPC)
850
851	client, cleanupC := TestClient(t, func(c *config.Config) {
852		c.ACLEnabled = true
853		c.Servers = []string{s.GetConfig().RPCAddr.String()}
854
855		pluginConfig := []*nconfig.PluginConfig{
856			{
857				Name: "mock_driver",
858				Config: map[string]interface{}{
859					"fs_isolation": string(isolation),
860				},
861			},
862		}
863
864		c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig)
865	})
866	defer cleanupC()
867
868	// Create a bad token
869	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
870	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
871
872	policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
873		[]string{acl.NamespaceCapabilityAllocExec})
874	tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyAllocExec)
875
876	policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
877		[]string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec})
878	tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyAllocNodeExec)
879
880	job := mock.BatchJob()
881	job.TaskGroups[0].Count = 1
882	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
883		"run_for": "20s",
884		"exec_command": map[string]interface{}{
885			"run_for":       "1ms",
886			"stdout_string": "some output",
887		},
888	}
889
890	// Wait for client to be running job
891	testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)
892
893	// Get the allocation ID
894	args := nstructs.AllocListRequest{}
895	args.Region = "global"
896	args.AuthToken = root.SecretID
897	args.Namespace = nstructs.DefaultNamespace
898	resp := nstructs.AllocListResponse{}
899	require.NoError(t, s.RPC("Alloc.List", &args, &resp))
900	require.Len(t, resp.Allocations, 1)
901	allocID := resp.Allocations[0].ID
902
903	cases := []struct {
904		Name          string
905		Token         string
906		ExpectedError string
907	}{
908		{
909			Name:          "bad token",
910			Token:         tokenBad.SecretID,
911			ExpectedError: structs.ErrPermissionDenied.Error(),
912		},
913		{
914			Name:          "alloc-exec token",
915			Token:         tokenAllocExec.SecretID,
916			ExpectedError: "",
917		},
918		{
919			Name:          "alloc-node-exec token",
920			Token:         tokenAllocNodeExec.SecretID,
921			ExpectedError: "",
922		},
923		{
924			Name:          "root token",
925			Token:         root.SecretID,
926			ExpectedError: "",
927		},
928	}
929
930	for _, c := range cases {
931		t.Run(c.Name, func(t *testing.T) {
932
933			// Make the request
934			req := &cstructs.AllocExecRequest{
935				AllocID: allocID,
936				Task:    job.TaskGroups[0].Tasks[0].Name,
937				Tty:     true,
938				Cmd:     []string{"placeholder command"},
939				QueryOptions: nstructs.QueryOptions{
940					Region:    "global",
941					AuthToken: c.Token,
942					Namespace: nstructs.DefaultNamespace,
943				},
944			}
945
946			// Get the handler
947			handler, err := client.StreamingRpcHandler("Allocations.Exec")
948			require.Nil(t, err)
949
950			// Create a pipe
951			p1, p2 := net.Pipe()
952			defer p1.Close()
953			defer p2.Close()
954
955			errCh := make(chan error)
956			frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
957
958			// Start the handler
959			go handler(p2)
960			go decodeFrames(t, p1, frames, errCh)
961
962			// Send the request
963			encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
964			require.Nil(t, encoder.Encode(req))
965
966			select {
967			case <-time.After(3 * time.Second):
968			case err := <-errCh:
969				if c.ExpectedError == "" {
970					require.NoError(t, err)
971				} else {
972					require.Contains(t, err.Error(), c.ExpectedError)
973				}
974			case f := <-frames:
975				// we are good if we don't expect an error
976				if c.ExpectedError != "" {
977					require.Fail(t, "unexpected frame", "frame: %#v", f)
978				}
979			}
980		})
981	}
982}
983
984// TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot asserts that token only needs
985// alloc-exec acl policy when chroot isolation is used
986func TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot(t *testing.T) {
987	t.Parallel()
988
989	if runtime.GOOS != "linux" || unix.Geteuid() != 0 {
990		t.Skip("chroot isolation requires linux root")
991	}
992
993	isolation := drivers.FSIsolationChroot
994
995	// Start a server and client
996	s, root, cleanupS := nomad.TestACLServer(t, nil)
997	defer cleanupS()
998	testutil.WaitForLeader(t, s.RPC)
999
1000	client, cleanup := TestClient(t, func(c *config.Config) {
1001		c.ACLEnabled = true
1002		c.Servers = []string{s.GetConfig().RPCAddr.String()}
1003
1004		pluginConfig := []*nconfig.PluginConfig{
1005			{
1006				Name: "mock_driver",
1007				Config: map[string]interface{}{
1008					"fs_isolation": string(isolation),
1009				},
1010			},
1011		}
1012
1013		c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig)
1014	})
1015	defer cleanup()
1016
1017	// Create a bad token
1018	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
1019	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
1020
1021	policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
1022		[]string{acl.NamespaceCapabilityAllocExec})
1023	tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-exec", policyAllocExec)
1024
1025	policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
1026		[]string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec})
1027	tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-node-exec", policyAllocNodeExec)
1028
1029	job := mock.BatchJob()
1030	job.TaskGroups[0].Count = 1
1031	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1032		"run_for": "20s",
1033		"exec_command": map[string]interface{}{
1034			"run_for":       "1ms",
1035			"stdout_string": "some output",
1036		},
1037	}
1038
1039	// Wait for client to be running job
1040	testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)
1041
1042	// Get the allocation ID
1043	args := nstructs.AllocListRequest{}
1044	args.Region = "global"
1045	args.AuthToken = root.SecretID
1046	args.Namespace = nstructs.DefaultNamespace
1047	resp := nstructs.AllocListResponse{}
1048	require.NoError(t, s.RPC("Alloc.List", &args, &resp))
1049	require.Len(t, resp.Allocations, 1)
1050	allocID := resp.Allocations[0].ID
1051
1052	cases := []struct {
1053		Name          string
1054		Token         string
1055		ExpectedError string
1056	}{
1057		{
1058			Name:          "bad token",
1059			Token:         tokenBad.SecretID,
1060			ExpectedError: structs.ErrPermissionDenied.Error(),
1061		},
1062		{
1063			Name:          "alloc-exec token",
1064			Token:         tokenAllocExec.SecretID,
1065			ExpectedError: "",
1066		},
1067		{
1068			Name:          "alloc-node-exec token",
1069			Token:         tokenAllocNodeExec.SecretID,
1070			ExpectedError: "",
1071		},
1072		{
1073			Name:          "root token",
1074			Token:         root.SecretID,
1075			ExpectedError: "",
1076		},
1077	}
1078
1079	for _, c := range cases {
1080		t.Run(c.Name, func(t *testing.T) {
1081
1082			// Make the request
1083			req := &cstructs.AllocExecRequest{
1084				AllocID: allocID,
1085				Task:    job.TaskGroups[0].Tasks[0].Name,
1086				Tty:     true,
1087				Cmd:     []string{"placeholder command"},
1088				QueryOptions: nstructs.QueryOptions{
1089					Region:    "global",
1090					AuthToken: c.Token,
1091					Namespace: nstructs.DefaultNamespace,
1092				},
1093			}
1094
1095			// Get the handler
1096			handler, err := client.StreamingRpcHandler("Allocations.Exec")
1097			require.Nil(t, err)
1098
1099			// Create a pipe
1100			p1, p2 := net.Pipe()
1101			defer p1.Close()
1102			defer p2.Close()
1103
1104			errCh := make(chan error)
1105			frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
1106
1107			// Start the handler
1108			go handler(p2)
1109			go decodeFrames(t, p1, frames, errCh)
1110
1111			// Send the request
1112			encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
1113			require.Nil(t, encoder.Encode(req))
1114
1115			select {
1116			case <-time.After(3 * time.Second):
1117			case err := <-errCh:
1118				if c.ExpectedError == "" {
1119					require.NoError(t, err)
1120				} else {
1121					require.Contains(t, err.Error(), c.ExpectedError)
1122				}
1123			case f := <-frames:
1124				// we are good if we don't expect an error
1125				if c.ExpectedError != "" {
1126					require.Fail(t, "unexpected frame", "frame: %#v", f)
1127				}
1128			}
1129		})
1130	}
1131}
1132
1133// TestAlloc_ExecStreaming_ACL_WithIsolation_None asserts that token needs
1134// alloc-node-exec acl policy as well when no isolation is used
1135func TestAlloc_ExecStreaming_ACL_WithIsolation_None(t *testing.T) {
1136	t.Parallel()
1137	isolation := drivers.FSIsolationNone
1138
1139	// Start a server and client
1140	s, root, cleanupS := nomad.TestACLServer(t, nil)
1141	defer cleanupS()
1142	testutil.WaitForLeader(t, s.RPC)
1143
1144	client, cleanup := TestClient(t, func(c *config.Config) {
1145		c.ACLEnabled = true
1146		c.Servers = []string{s.GetConfig().RPCAddr.String()}
1147
1148		pluginConfig := []*nconfig.PluginConfig{
1149			{
1150				Name: "mock_driver",
1151				Config: map[string]interface{}{
1152					"fs_isolation": string(isolation),
1153				},
1154			},
1155		}
1156
1157		c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig)
1158	})
1159	defer cleanup()
1160
1161	// Create a bad token
1162	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
1163	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
1164
1165	policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
1166		[]string{acl.NamespaceCapabilityAllocExec})
1167	tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-exec", policyAllocExec)
1168
1169	policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "",
1170		[]string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec})
1171	tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-node-exec", policyAllocNodeExec)
1172
1173	job := mock.BatchJob()
1174	job.TaskGroups[0].Count = 1
1175	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1176		"run_for": "20s",
1177		"exec_command": map[string]interface{}{
1178			"run_for":       "1ms",
1179			"stdout_string": "some output",
1180		},
1181	}
1182
1183	// Wait for client to be running job
1184	testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)
1185
1186	// Get the allocation ID
1187	args := nstructs.AllocListRequest{}
1188	args.Region = "global"
1189	args.AuthToken = root.SecretID
1190	args.Namespace = nstructs.DefaultNamespace
1191	resp := nstructs.AllocListResponse{}
1192	require.NoError(t, s.RPC("Alloc.List", &args, &resp))
1193	require.Len(t, resp.Allocations, 1)
1194	allocID := resp.Allocations[0].ID
1195
1196	cases := []struct {
1197		Name          string
1198		Token         string
1199		ExpectedError string
1200	}{
1201		{
1202			Name:          "bad token",
1203			Token:         tokenBad.SecretID,
1204			ExpectedError: structs.ErrPermissionDenied.Error(),
1205		},
1206		{
1207			Name:          "alloc-exec token",
1208			Token:         tokenAllocExec.SecretID,
1209			ExpectedError: structs.ErrPermissionDenied.Error(),
1210		},
1211		{
1212			Name:          "alloc-node-exec token",
1213			Token:         tokenAllocNodeExec.SecretID,
1214			ExpectedError: "",
1215		},
1216		{
1217			Name:          "root token",
1218			Token:         root.SecretID,
1219			ExpectedError: "",
1220		},
1221	}
1222
1223	for _, c := range cases {
1224		t.Run(c.Name, func(t *testing.T) {
1225
1226			// Make the request
1227			req := &cstructs.AllocExecRequest{
1228				AllocID: allocID,
1229				Task:    job.TaskGroups[0].Tasks[0].Name,
1230				Tty:     true,
1231				Cmd:     []string{"placeholder command"},
1232				QueryOptions: nstructs.QueryOptions{
1233					Region:    "global",
1234					AuthToken: c.Token,
1235					Namespace: nstructs.DefaultNamespace,
1236				},
1237			}
1238
1239			// Get the handler
1240			handler, err := client.StreamingRpcHandler("Allocations.Exec")
1241			require.Nil(t, err)
1242
1243			// Create a pipe
1244			p1, p2 := net.Pipe()
1245			defer p1.Close()
1246			defer p2.Close()
1247
1248			errCh := make(chan error)
1249			frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
1250
1251			// Start the handler
1252			go handler(p2)
1253			go decodeFrames(t, p1, frames, errCh)
1254
1255			// Send the request
1256			encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
1257			require.Nil(t, encoder.Encode(req))
1258
1259			select {
1260			case <-time.After(3 * time.Second):
1261			case err := <-errCh:
1262				if c.ExpectedError == "" {
1263					require.NoError(t, err)
1264				} else {
1265					require.Contains(t, err.Error(), c.ExpectedError)
1266				}
1267			case f := <-frames:
1268				// we are good if we don't expect an error
1269				if c.ExpectedError != "" {
1270					require.Fail(t, "unexpected frame", "frame: %#v", f)
1271				}
1272			}
1273		})
1274	}
1275}
1276
1277func decodeFrames(t *testing.T, p1 net.Conn, frames chan<- *drivers.ExecTaskStreamingResponseMsg, errCh chan<- error) {
1278	// Start the decoder
1279	decoder := codec.NewDecoder(p1, nstructs.MsgpackHandle)
1280
1281	for {
1282		var msg cstructs.StreamErrWrapper
1283		if err := decoder.Decode(&msg); err != nil {
1284			if err == io.EOF || strings.Contains(err.Error(), "closed") {
1285				return
1286			}
1287			t.Logf("received error decoding: %#v", err)
1288
1289			errCh <- fmt.Errorf("error decoding: %v", err)
1290			return
1291		}
1292
1293		if msg.Error != nil {
1294			errCh <- msg.Error
1295			continue
1296		}
1297
1298		var frame drivers.ExecTaskStreamingResponseMsg
1299		if err := json.Unmarshal(msg.Payload, &frame); err != nil {
1300			errCh <- err
1301			return
1302		}
1303		t.Logf("received message: %#v", msg)
1304		frames <- &frame
1305	}
1306}
1307