1package nomad
2
3import (
4	"fmt"
5	"io"
6	"net"
7	"strings"
8	"testing"
9	"time"
10
11	codec "github.com/hashicorp/go-msgpack/codec"
12	msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
13	"github.com/hashicorp/nomad/acl"
14	"github.com/hashicorp/nomad/client"
15	"github.com/hashicorp/nomad/client/config"
16	cstructs "github.com/hashicorp/nomad/client/structs"
17	"github.com/hashicorp/nomad/helper/uuid"
18	"github.com/hashicorp/nomad/nomad/mock"
19	"github.com/hashicorp/nomad/nomad/structs"
20	"github.com/hashicorp/nomad/testutil"
21	"github.com/stretchr/testify/require"
22)
23
24func TestClientFS_List_Local(t *testing.T) {
25	t.Parallel()
26	require := require.New(t)
27
28	// Start a server and client
29	s, cleanupS := TestServer(t, nil)
30	defer cleanupS()
31	codec := rpcClient(t, s)
32	testutil.WaitForLeader(t, s.RPC)
33
34	c, cleanupC := client.TestClient(t, func(c *config.Config) {
35		c.Servers = []string{s.config.RPCAddr.String()}
36	})
37	defer cleanupC()
38
39	// Force an allocation onto the node
40	a := mock.Alloc()
41	a.Job.Type = structs.JobTypeBatch
42	a.NodeID = c.NodeID()
43	a.Job.TaskGroups[0].Count = 1
44	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
45		Name:   "web",
46		Driver: "mock_driver",
47		Config: map[string]interface{}{
48			"run_for": "2s",
49		},
50		LogConfig: structs.DefaultLogConfig(),
51		Resources: &structs.Resources{
52			CPU:      500,
53			MemoryMB: 256,
54		},
55	}
56
57	// Wait for the client to connect
58	testutil.WaitForResult(func() (bool, error) {
59		nodes := s.connectedNodes()
60		return len(nodes) == 1, nil
61	}, func(err error) {
62		t.Fatalf("should have a clients")
63	})
64
65	// Upsert the allocation
66	state := s.State()
67	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
68	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
69
70	// Wait for the client to run the allocation
71	testutil.WaitForResult(func() (bool, error) {
72		alloc, err := state.AllocByID(nil, a.ID)
73		if err != nil {
74			return false, err
75		}
76		if alloc == nil {
77			return false, fmt.Errorf("unknown alloc")
78		}
79		if alloc.ClientStatus != structs.AllocClientStatusComplete {
80			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
81		}
82
83		return true, nil
84	}, func(err error) {
85		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
86	})
87
88	// Make the request without having a node-id
89	req := &cstructs.FsListRequest{
90		Path:         "/",
91		QueryOptions: structs.QueryOptions{Region: "global"},
92	}
93
94	// Fetch the response
95	var resp cstructs.FsListResponse
96	err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
97	require.NotNil(err)
98	require.Contains(err.Error(), "missing")
99
100	// Fetch the response setting the alloc id
101	req.AllocID = a.ID
102	var resp2 cstructs.FsListResponse
103	err = msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp2)
104	require.Nil(err)
105	require.NotEmpty(resp2.Files)
106}
107
108func TestClientFS_List_ACL(t *testing.T) {
109	t.Parallel()
110
111	// Start a server
112	s, root, cleanupS := TestACLServer(t, nil)
113	defer cleanupS()
114	codec := rpcClient(t, s)
115	testutil.WaitForLeader(t, s.RPC)
116
117	// Create a bad token
118	policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny})
119	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
120
121	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})
122	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
123
124	// Upsert the allocation
125	state := s.State()
126	alloc := mock.Alloc()
127	require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1010, alloc.Job))
128	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1011, []*structs.Allocation{alloc}))
129
130	cases := []struct {
131		Name          string
132		Token         string
133		ExpectedError string
134	}{
135		{
136			Name:          "bad token",
137			Token:         tokenBad.SecretID,
138			ExpectedError: structs.ErrPermissionDenied.Error(),
139		},
140		{
141			Name:          "good token",
142			Token:         tokenGood.SecretID,
143			ExpectedError: structs.ErrUnknownNodePrefix,
144		},
145		{
146			Name:          "root token",
147			Token:         root.SecretID,
148			ExpectedError: structs.ErrUnknownNodePrefix,
149		},
150	}
151
152	for _, c := range cases {
153		t.Run(c.Name, func(t *testing.T) {
154
155			// Make the request
156			req := &cstructs.FsListRequest{
157				AllocID: alloc.ID,
158				Path:    "/",
159				QueryOptions: structs.QueryOptions{
160					Region:    "global",
161					Namespace: structs.DefaultNamespace,
162					AuthToken: c.Token,
163				},
164			}
165
166			// Fetch the response
167			var resp cstructs.FsListResponse
168			err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
169			require.NotNil(t, err)
170			require.Contains(t, err.Error(), c.ExpectedError)
171		})
172	}
173}
174
175func TestClientFS_List_Remote(t *testing.T) {
176	t.Parallel()
177	require := require.New(t)
178
179	// Start a server and client
180	s1, cleanupS1 := TestServer(t, func(c *Config) {
181		c.BootstrapExpect = 2
182	})
183	defer cleanupS1()
184	s2, cleanupS2 := TestServer(t, func(c *Config) {
185		c.BootstrapExpect = 2
186	})
187	defer cleanupS2()
188	TestJoin(t, s1, s2)
189	testutil.WaitForLeader(t, s1.RPC)
190	testutil.WaitForLeader(t, s2.RPC)
191	codec := rpcClient(t, s2)
192
193	c, cleanupC := client.TestClient(t, func(c *config.Config) {
194		c.Servers = []string{s2.config.RPCAddr.String()}
195	})
196	defer cleanupC()
197
198	// Force an allocation onto the node
199	a := mock.Alloc()
200	a.Job.Type = structs.JobTypeBatch
201	a.NodeID = c.NodeID()
202	a.Job.TaskGroups[0].Count = 1
203	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
204		Name:   "web",
205		Driver: "mock_driver",
206		Config: map[string]interface{}{
207			"run_for": "2s",
208		},
209		LogConfig: structs.DefaultLogConfig(),
210		Resources: &structs.Resources{
211			CPU:      500,
212			MemoryMB: 256,
213		},
214	}
215
216	// Wait for the client to connect
217	testutil.WaitForResult(func() (bool, error) {
218		nodes := s2.connectedNodes()
219		return len(nodes) == 1, nil
220	}, func(err error) {
221		t.Fatalf("should have a clients")
222	})
223
224	// Upsert the allocation
225	state1 := s1.State()
226	state2 := s2.State()
227	require.Nil(state1.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
228	require.Nil(state1.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
229	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
230	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
231
232	// Wait for the client to run the allocation
233	testutil.WaitForResult(func() (bool, error) {
234		alloc, err := state2.AllocByID(nil, a.ID)
235		if err != nil {
236			return false, err
237		}
238		if alloc == nil {
239			return false, fmt.Errorf("unknown alloc")
240		}
241		if alloc.ClientStatus != structs.AllocClientStatusComplete {
242			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
243		}
244
245		return true, nil
246	}, func(err error) {
247		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
248	})
249
250	// Force remove the connection locally in case it exists
251	s1.nodeConnsLock.Lock()
252	delete(s1.nodeConns, c.NodeID())
253	s1.nodeConnsLock.Unlock()
254
255	// Make the request without having a node-id
256	req := &cstructs.FsListRequest{
257		AllocID:      a.ID,
258		Path:         "/",
259		QueryOptions: structs.QueryOptions{Region: "global"},
260	}
261
262	// Fetch the response
263	var resp cstructs.FsListResponse
264	err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
265	require.Nil(err)
266	require.NotEmpty(resp.Files)
267}
268
269func TestClientFS_Stat_OldNode(t *testing.T) {
270	t.Parallel()
271	require := require.New(t)
272
273	// Start a server
274	s, cleanupS := TestServer(t, nil)
275	defer cleanupS()
276	state := s.State()
277	codec := rpcClient(t, s)
278	testutil.WaitForLeader(t, s.RPC)
279
280	// Test for an old version error
281	node := mock.Node()
282	node.Attributes["nomad.version"] = "0.7.1"
283	require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1005, node))
284
285	alloc := mock.Alloc()
286	alloc.NodeID = node.ID
287	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1006, []*structs.Allocation{alloc}))
288
289	req := &cstructs.FsStatRequest{
290		AllocID:      alloc.ID,
291		Path:         "/",
292		QueryOptions: structs.QueryOptions{Region: "global"},
293	}
294
295	var resp cstructs.FsStatResponse
296	err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
297	require.True(structs.IsErrNodeLacksRpc(err), err.Error())
298}
299
300func TestClientFS_Stat_Local(t *testing.T) {
301	t.Parallel()
302	require := require.New(t)
303
304	// Start a server and client
305	s, cleanupS := TestServer(t, nil)
306	defer cleanupS()
307	codec := rpcClient(t, s)
308	testutil.WaitForLeader(t, s.RPC)
309
310	c, cleanupC := client.TestClient(t, func(c *config.Config) {
311		c.Servers = []string{s.config.RPCAddr.String()}
312	})
313	defer cleanupC()
314
315	// Force an allocation onto the node
316	a := mock.Alloc()
317	a.Job.Type = structs.JobTypeBatch
318	a.NodeID = c.NodeID()
319	a.Job.TaskGroups[0].Count = 1
320	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
321		Name:   "web",
322		Driver: "mock_driver",
323		Config: map[string]interface{}{
324			"run_for": "2s",
325		},
326		LogConfig: structs.DefaultLogConfig(),
327		Resources: &structs.Resources{
328			CPU:      500,
329			MemoryMB: 256,
330		},
331	}
332
333	// Wait for the client to connect
334	testutil.WaitForResult(func() (bool, error) {
335		nodes := s.connectedNodes()
336		return len(nodes) == 1, nil
337	}, func(err error) {
338		t.Fatalf("should have a clients")
339	})
340
341	// Upsert the allocation
342	state := s.State()
343	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
344	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
345
346	// Wait for the client to run the allocation
347	testutil.WaitForResult(func() (bool, error) {
348		alloc, err := state.AllocByID(nil, a.ID)
349		if err != nil {
350			return false, err
351		}
352		if alloc == nil {
353			return false, fmt.Errorf("unknown alloc")
354		}
355		if alloc.ClientStatus != structs.AllocClientStatusComplete {
356			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
357		}
358
359		return true, nil
360	}, func(err error) {
361		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
362	})
363
364	// Make the request without having a node-id
365	req := &cstructs.FsStatRequest{
366		Path:         "/",
367		QueryOptions: structs.QueryOptions{Region: "global"},
368	}
369
370	// Fetch the response
371	var resp cstructs.FsStatResponse
372	err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
373	require.NotNil(err)
374	require.Contains(err.Error(), "missing")
375
376	// Fetch the response setting the alloc id
377	req.AllocID = a.ID
378	var resp2 cstructs.FsStatResponse
379	err = msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp2)
380	require.Nil(err)
381	require.NotNil(resp2.Info)
382}
383
384func TestClientFS_Stat_ACL(t *testing.T) {
385	t.Parallel()
386
387	// Start a server
388	s, root, cleanupS := TestACLServer(t, nil)
389	defer cleanupS()
390	codec := rpcClient(t, s)
391	testutil.WaitForLeader(t, s.RPC)
392
393	// Create a bad token
394	policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny})
395	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
396
397	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})
398	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
399
400	// Upsert the allocation
401	state := s.State()
402	alloc := mock.Alloc()
403	require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1010, alloc.Job))
404	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1011, []*structs.Allocation{alloc}))
405
406	cases := []struct {
407		Name          string
408		Token         string
409		ExpectedError string
410	}{
411		{
412			Name:          "bad token",
413			Token:         tokenBad.SecretID,
414			ExpectedError: structs.ErrPermissionDenied.Error(),
415		},
416		{
417			Name:          "good token",
418			Token:         tokenGood.SecretID,
419			ExpectedError: structs.ErrUnknownNodePrefix,
420		},
421		{
422			Name:          "root token",
423			Token:         root.SecretID,
424			ExpectedError: structs.ErrUnknownNodePrefix,
425		},
426	}
427
428	for _, c := range cases {
429		t.Run(c.Name, func(t *testing.T) {
430
431			// Make the request
432			req := &cstructs.FsStatRequest{
433				AllocID: alloc.ID,
434				Path:    "/",
435				QueryOptions: structs.QueryOptions{
436					Region:    "global",
437					Namespace: structs.DefaultNamespace,
438					AuthToken: c.Token,
439				},
440			}
441
442			// Fetch the response
443			var resp cstructs.FsStatResponse
444			err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
445			require.NotNil(t, err)
446			require.Contains(t, err.Error(), c.ExpectedError)
447		})
448	}
449}
450
451func TestClientFS_Stat_Remote(t *testing.T) {
452	t.Parallel()
453	require := require.New(t)
454
455	// Start a server and client
456	s1, cleanupS1 := TestServer(t, func(c *Config) {
457		c.BootstrapExpect = 2
458	})
459	defer cleanupS1()
460	s2, cleanupS2 := TestServer(t, func(c *Config) {
461		c.BootstrapExpect = 2
462	})
463	defer cleanupS2()
464	TestJoin(t, s1, s2)
465	testutil.WaitForLeader(t, s1.RPC)
466	testutil.WaitForLeader(t, s2.RPC)
467	codec := rpcClient(t, s2)
468
469	c, cleanup := client.TestClient(t, func(c *config.Config) {
470		c.Servers = []string{s2.config.RPCAddr.String()}
471	})
472	defer cleanup()
473
474	// Force an allocation onto the node
475	a := mock.Alloc()
476	a.Job.Type = structs.JobTypeBatch
477	a.NodeID = c.NodeID()
478	a.Job.TaskGroups[0].Count = 1
479	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
480		Name:   "web",
481		Driver: "mock_driver",
482		Config: map[string]interface{}{
483			"run_for": "2s",
484		},
485		LogConfig: structs.DefaultLogConfig(),
486		Resources: &structs.Resources{
487			CPU:      500,
488			MemoryMB: 256,
489		},
490	}
491
492	// Wait for the client to connect
493	testutil.WaitForResult(func() (bool, error) {
494		nodes := s2.connectedNodes()
495		return len(nodes) == 1, nil
496	}, func(err error) {
497		t.Fatalf("should have a clients")
498	})
499
500	// Upsert the allocation
501	state1 := s1.State()
502	state2 := s2.State()
503	require.Nil(state1.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
504	require.Nil(state1.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
505	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
506	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
507
508	// Wait for the client to run the allocation
509	testutil.WaitForResult(func() (bool, error) {
510		alloc, err := state2.AllocByID(nil, a.ID)
511		if err != nil {
512			return false, err
513		}
514		if alloc == nil {
515			return false, fmt.Errorf("unknown alloc")
516		}
517		if alloc.ClientStatus != structs.AllocClientStatusComplete {
518			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
519		}
520
521		return true, nil
522	}, func(err error) {
523		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
524	})
525
526	// Force remove the connection locally in case it exists
527	s1.nodeConnsLock.Lock()
528	delete(s1.nodeConns, c.NodeID())
529	s1.nodeConnsLock.Unlock()
530
531	// Make the request without having a node-id
532	req := &cstructs.FsStatRequest{
533		AllocID:      a.ID,
534		Path:         "/",
535		QueryOptions: structs.QueryOptions{Region: "global"},
536	}
537
538	// Fetch the response
539	var resp cstructs.FsStatResponse
540	err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
541	require.Nil(err)
542	require.NotNil(resp.Info)
543}
544
545func TestClientFS_Streaming_NoAlloc(t *testing.T) {
546	t.Parallel()
547	require := require.New(t)
548
549	// Start a server and client
550	s, cleanupS := TestServer(t, nil)
551	defer cleanupS()
552	testutil.WaitForLeader(t, s.RPC)
553
554	// Make the request with bad allocation id
555	req := &cstructs.FsStreamRequest{
556		AllocID:      uuid.Generate(),
557		QueryOptions: structs.QueryOptions{Region: "global"},
558	}
559
560	// Get the handler
561	handler, err := s.StreamingRpcHandler("FileSystem.Stream")
562	require.Nil(err)
563
564	// Create a pipe
565	p1, p2 := net.Pipe()
566	defer p1.Close()
567	defer p2.Close()
568
569	errCh := make(chan error)
570	streamMsg := make(chan *cstructs.StreamErrWrapper)
571
572	// Start the handler
573	go handler(p2)
574
575	// Start the decoder
576	go func() {
577		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
578		for {
579			var msg cstructs.StreamErrWrapper
580			if err := decoder.Decode(&msg); err != nil {
581				if err == io.EOF || strings.Contains(err.Error(), "closed") {
582					return
583				}
584				errCh <- fmt.Errorf("error decoding: %v", err)
585			}
586
587			streamMsg <- &msg
588		}
589	}()
590
591	// Send the request
592	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
593	require.Nil(encoder.Encode(req))
594
595	timeout := time.After(5 * time.Second)
596
597OUTER:
598	for {
599		select {
600		case <-timeout:
601			t.Fatal("timeout")
602		case err := <-errCh:
603			t.Fatal(err)
604		case msg := <-streamMsg:
605			if msg.Error == nil {
606				continue
607			}
608
609			if structs.IsErrUnknownAllocation(msg.Error) {
610				break OUTER
611			}
612		}
613	}
614}
615
616func TestClientFS_Streaming_ACL(t *testing.T) {
617	t.Parallel()
618
619	// Start a server
620	s, root, cleanupS := TestACLServer(t, nil)
621	defer cleanupS()
622	testutil.WaitForLeader(t, s.RPC)
623
624	// Create a bad token
625	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
626	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
627
628	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
629		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
630	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
631
632	// Upsert the allocation
633	state := s.State()
634	alloc := mock.Alloc()
635	require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1010, alloc.Job))
636	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1011, []*structs.Allocation{alloc}))
637
638	cases := []struct {
639		Name          string
640		Token         string
641		ExpectedError string
642	}{
643		{
644			Name:          "bad token",
645			Token:         tokenBad.SecretID,
646			ExpectedError: structs.ErrPermissionDenied.Error(),
647		},
648		{
649			Name:          "good token",
650			Token:         tokenGood.SecretID,
651			ExpectedError: structs.ErrUnknownNodePrefix,
652		},
653		{
654			Name:          "root token",
655			Token:         root.SecretID,
656			ExpectedError: structs.ErrUnknownNodePrefix,
657		},
658	}
659
660	for _, c := range cases {
661		t.Run(c.Name, func(t *testing.T) {
662			// Make the request with bad allocation id
663			req := &cstructs.FsStreamRequest{
664				AllocID: alloc.ID,
665				QueryOptions: structs.QueryOptions{
666					Namespace: structs.DefaultNamespace,
667					Region:    "global",
668					AuthToken: c.Token,
669				},
670			}
671
672			// Get the handler
673			handler, err := s.StreamingRpcHandler("FileSystem.Stream")
674			require.NoError(t, err)
675
676			// Create a pipe
677			p1, p2 := net.Pipe()
678			defer p1.Close()
679			defer p2.Close()
680
681			errCh := make(chan error)
682			streamMsg := make(chan *cstructs.StreamErrWrapper)
683
684			// Start the handler
685			go handler(p2)
686
687			// Start the decoder
688			go func() {
689				decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
690				for {
691					var msg cstructs.StreamErrWrapper
692					if err := decoder.Decode(&msg); err != nil {
693						if err == io.EOF || strings.Contains(err.Error(), "closed") {
694							return
695						}
696						errCh <- fmt.Errorf("error decoding: %v", err)
697					}
698
699					streamMsg <- &msg
700				}
701			}()
702
703			// Send the request
704			encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
705			require.NoError(t, encoder.Encode(req))
706
707			timeout := time.After(5 * time.Second)
708
709		OUTER:
710			for {
711				select {
712				case <-timeout:
713					t.Fatal("timeout")
714				case err := <-errCh:
715					t.Fatal(err)
716				case msg := <-streamMsg:
717					if msg.Error == nil {
718						continue
719					}
720
721					if strings.Contains(msg.Error.Error(), c.ExpectedError) {
722						break OUTER
723					} else {
724						t.Fatalf("Bad error: %v", msg.Error)
725					}
726				}
727			}
728		})
729	}
730}
731
732func TestClientFS_Streaming_Local(t *testing.T) {
733	t.Parallel()
734	require := require.New(t)
735
736	// Start a server and client
737	s, cleanupS := TestServer(t, nil)
738	defer cleanupS()
739	testutil.WaitForLeader(t, s.RPC)
740
741	c, cleanup := client.TestClient(t, func(c *config.Config) {
742		c.Servers = []string{s.config.RPCAddr.String()}
743	})
744	defer cleanup()
745
746	// Force an allocation onto the node
747	expected := "Hello from the other side"
748	a := mock.Alloc()
749	a.Job.Type = structs.JobTypeBatch
750	a.NodeID = c.NodeID()
751	a.Job.TaskGroups[0].Count = 1
752	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
753		Name:   "web",
754		Driver: "mock_driver",
755		Config: map[string]interface{}{
756			"run_for":       "2s",
757			"stdout_string": expected,
758		},
759		LogConfig: structs.DefaultLogConfig(),
760		Resources: &structs.Resources{
761			CPU:      500,
762			MemoryMB: 256,
763		},
764	}
765
766	// Wait for the client to connect
767	testutil.WaitForResult(func() (bool, error) {
768		nodes := s.connectedNodes()
769		return len(nodes) == 1, nil
770	}, func(err error) {
771		t.Fatalf("should have a clients")
772	})
773
774	// Upsert the allocation
775	state := s.State()
776	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
777	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
778
779	// Wait for the client to run the allocation
780	testutil.WaitForResult(func() (bool, error) {
781		alloc, err := state.AllocByID(nil, a.ID)
782		if err != nil {
783			return false, err
784		}
785		if alloc == nil {
786			return false, fmt.Errorf("unknown alloc")
787		}
788		if alloc.ClientStatus != structs.AllocClientStatusComplete {
789			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
790		}
791
792		return true, nil
793	}, func(err error) {
794		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
795	})
796
797	// Make the request
798	req := &cstructs.FsStreamRequest{
799		AllocID:      a.ID,
800		Path:         "alloc/logs/web.stdout.0",
801		Origin:       "start",
802		PlainText:    true,
803		QueryOptions: structs.QueryOptions{Region: "global"},
804	}
805
806	// Get the handler
807	handler, err := s.StreamingRpcHandler("FileSystem.Stream")
808	require.Nil(err)
809
810	// Create a pipe
811	p1, p2 := net.Pipe()
812	defer p1.Close()
813	defer p2.Close()
814
815	errCh := make(chan error)
816	streamMsg := make(chan *cstructs.StreamErrWrapper)
817
818	// Start the handler
819	go handler(p2)
820
821	// Start the decoder
822	go func() {
823		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
824		for {
825			var msg cstructs.StreamErrWrapper
826			if err := decoder.Decode(&msg); err != nil {
827				if err == io.EOF || strings.Contains(err.Error(), "closed") {
828					return
829				}
830				errCh <- fmt.Errorf("error decoding: %v", err)
831			}
832
833			streamMsg <- &msg
834		}
835	}()
836
837	// Send the request
838	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
839	require.Nil(encoder.Encode(req))
840
841	timeout := time.After(3 * time.Second)
842	received := ""
843OUTER:
844	for {
845		select {
846		case <-timeout:
847			t.Fatal("timeout")
848		case err := <-errCh:
849			t.Fatal(err)
850		case msg := <-streamMsg:
851			if msg.Error != nil {
852				t.Fatalf("Got error: %v", msg.Error.Error())
853			}
854
855			// Add the payload
856			received += string(msg.Payload)
857			if received == expected {
858				break OUTER
859			}
860		}
861	}
862}
863
864func TestClientFS_Streaming_Local_Follow(t *testing.T) {
865	t.Parallel()
866	require := require.New(t)
867
868	// Start a server and client
869	s, cleanupS := TestServer(t, nil)
870	defer cleanupS()
871	testutil.WaitForLeader(t, s.RPC)
872
873	c, cleanupC := client.TestClient(t, func(c *config.Config) {
874		c.Servers = []string{s.config.RPCAddr.String()}
875	})
876	defer cleanupC()
877
878	// Force an allocation onto the node
879	expectedBase := "Hello from the other side"
880	repeat := 10
881
882	a := mock.Alloc()
883	a.Job.Type = structs.JobTypeBatch
884	a.NodeID = c.NodeID()
885	a.Job.TaskGroups[0].Count = 1
886	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
887		Name:   "web",
888		Driver: "mock_driver",
889		Config: map[string]interface{}{
890			"run_for":                "3s",
891			"stdout_string":          expectedBase,
892			"stdout_repeat":          repeat,
893			"stdout_repeat_duration": "200ms",
894		},
895		LogConfig: structs.DefaultLogConfig(),
896		Resources: &structs.Resources{
897			CPU:      500,
898			MemoryMB: 256,
899		},
900	}
901
902	// Wait for the client to connect
903	testutil.WaitForResult(func() (bool, error) {
904		nodes := s.connectedNodes()
905		return len(nodes) == 1, nil
906	}, func(err error) {
907		t.Fatalf("should have a clients")
908	})
909
910	// Upsert the allocation
911	state := s.State()
912	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
913	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
914
915	// Wait for the client to run the allocation
916	testutil.WaitForResult(func() (bool, error) {
917		alloc, err := state.AllocByID(nil, a.ID)
918		if err != nil {
919			return false, err
920		}
921		if alloc == nil {
922			return false, fmt.Errorf("unknown alloc")
923		}
924		if alloc.ClientStatus != structs.AllocClientStatusRunning {
925			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
926		}
927
928		return true, nil
929	}, func(err error) {
930		t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err)
931	})
932
933	// Make the request
934	req := &cstructs.FsStreamRequest{
935		AllocID:      a.ID,
936		Path:         "alloc/logs/web.stdout.0",
937		Origin:       "start",
938		PlainText:    true,
939		Follow:       true,
940		QueryOptions: structs.QueryOptions{Region: "global"},
941	}
942
943	// Get the handler
944	handler, err := s.StreamingRpcHandler("FileSystem.Stream")
945	require.Nil(err)
946
947	// Create a pipe
948	p1, p2 := net.Pipe()
949	defer p1.Close()
950	defer p2.Close()
951
952	errCh := make(chan error)
953	streamMsg := make(chan *cstructs.StreamErrWrapper)
954
955	// Start the handler
956	go handler(p2)
957
958	// Start the decoder
959	go func() {
960		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
961		for {
962			var msg cstructs.StreamErrWrapper
963			if err := decoder.Decode(&msg); err != nil {
964				if err == io.EOF || strings.Contains(err.Error(), "closed") {
965					return
966				}
967				errCh <- fmt.Errorf("error decoding: %v", err)
968			}
969
970			streamMsg <- &msg
971		}
972	}()
973
974	// Send the request
975	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
976	require.Nil(encoder.Encode(req))
977
978	timeout := time.After(20 * time.Second)
979	expected := strings.Repeat(expectedBase, repeat+1)
980	received := ""
981OUTER:
982	for {
983		select {
984		case <-timeout:
985			t.Fatal("timeout")
986		case err := <-errCh:
987			t.Fatal(err)
988		case msg := <-streamMsg:
989			if msg.Error != nil {
990				t.Fatalf("Got error: %v", msg.Error.Error())
991			}
992
993			// Add the payload
994			received += string(msg.Payload)
995			if received == expected {
996				break OUTER
997			}
998		}
999	}
1000}
1001
1002func TestClientFS_Streaming_Remote_Server(t *testing.T) {
1003	t.Parallel()
1004	require := require.New(t)
1005
1006	// Start a server and client
1007	s1, cleanupS1 := TestServer(t, func(c *Config) {
1008		c.BootstrapExpect = 2
1009	})
1010	defer cleanupS1()
1011	s2, cleanupS2 := TestServer(t, func(c *Config) {
1012		c.BootstrapExpect = 2
1013	})
1014	defer cleanupS2()
1015	TestJoin(t, s1, s2)
1016	testutil.WaitForLeader(t, s1.RPC)
1017	testutil.WaitForLeader(t, s2.RPC)
1018
1019	c, cleanupC := client.TestClient(t, func(c *config.Config) {
1020		c.Servers = []string{s2.config.RPCAddr.String()}
1021	})
1022	defer cleanupC()
1023
1024	// Force an allocation onto the node
1025	expected := "Hello from the other side"
1026	a := mock.Alloc()
1027	a.Job.Type = structs.JobTypeBatch
1028	a.NodeID = c.NodeID()
1029	a.Job.TaskGroups[0].Count = 1
1030	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
1031		Name:   "web",
1032		Driver: "mock_driver",
1033		Config: map[string]interface{}{
1034			"run_for":       "2s",
1035			"stdout_string": expected,
1036		},
1037		LogConfig: structs.DefaultLogConfig(),
1038		Resources: &structs.Resources{
1039			CPU:      500,
1040			MemoryMB: 256,
1041		},
1042	}
1043
1044	// Wait for the client to connect
1045	testutil.WaitForResult(func() (bool, error) {
1046		nodes := s2.connectedNodes()
1047		return len(nodes) == 1, nil
1048	}, func(err error) {
1049		t.Fatalf("should have a clients")
1050	})
1051
1052	// Upsert the allocation
1053	state1 := s1.State()
1054	state2 := s2.State()
1055	require.Nil(state1.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1056	require.Nil(state1.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1057	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1058	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1059
1060	// Wait for the client to run the allocation
1061	testutil.WaitForResult(func() (bool, error) {
1062		alloc, err := state2.AllocByID(nil, a.ID)
1063		if err != nil {
1064			return false, err
1065		}
1066		if alloc == nil {
1067			return false, fmt.Errorf("unknown alloc")
1068		}
1069		if alloc.ClientStatus != structs.AllocClientStatusComplete {
1070			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
1071		}
1072
1073		return true, nil
1074	}, func(err error) {
1075		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
1076	})
1077
1078	// Force remove the connection locally in case it exists
1079	s1.nodeConnsLock.Lock()
1080	delete(s1.nodeConns, c.NodeID())
1081	s1.nodeConnsLock.Unlock()
1082
1083	// Make the request
1084	req := &cstructs.FsStreamRequest{
1085		AllocID:      a.ID,
1086		Path:         "alloc/logs/web.stdout.0",
1087		Origin:       "start",
1088		PlainText:    true,
1089		QueryOptions: structs.QueryOptions{Region: "global"},
1090	}
1091
1092	// Get the handler
1093	handler, err := s1.StreamingRpcHandler("FileSystem.Stream")
1094	require.Nil(err)
1095
1096	// Create a pipe
1097	p1, p2 := net.Pipe()
1098	defer p1.Close()
1099	defer p2.Close()
1100
1101	errCh := make(chan error)
1102	streamMsg := make(chan *cstructs.StreamErrWrapper)
1103
1104	// Start the handler
1105	go handler(p2)
1106
1107	// Start the decoder
1108	go func() {
1109		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1110		for {
1111			var msg cstructs.StreamErrWrapper
1112			if err := decoder.Decode(&msg); err != nil {
1113				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1114					return
1115				}
1116				errCh <- fmt.Errorf("error decoding: %v", err)
1117			}
1118
1119			streamMsg <- &msg
1120		}
1121	}()
1122
1123	// Send the request
1124	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1125	require.Nil(encoder.Encode(req))
1126
1127	timeout := time.After(3 * time.Second)
1128	received := ""
1129OUTER:
1130	for {
1131		select {
1132		case <-timeout:
1133			t.Fatal("timeout")
1134		case err := <-errCh:
1135			t.Fatal(err)
1136		case msg := <-streamMsg:
1137			if msg.Error != nil {
1138				t.Fatalf("Got error: %v", msg.Error.Error())
1139			}
1140
1141			// Add the payload
1142			received += string(msg.Payload)
1143			if received == expected {
1144				break OUTER
1145			}
1146		}
1147	}
1148}
1149
1150func TestClientFS_Streaming_Remote_Region(t *testing.T) {
1151	t.Parallel()
1152	require := require.New(t)
1153
1154	// Start a server and client
1155	s1, cleanupS1 := TestServer(t, nil)
1156	defer cleanupS1()
1157	s2, cleanupS2 := TestServer(t, func(c *Config) {
1158		c.Region = "two"
1159	})
1160	defer cleanupS2()
1161	TestJoin(t, s1, s2)
1162	testutil.WaitForLeader(t, s1.RPC)
1163	testutil.WaitForLeader(t, s2.RPC)
1164
1165	c, cleanupC := client.TestClient(t, func(c *config.Config) {
1166		c.Servers = []string{s2.config.RPCAddr.String()}
1167		c.Region = "two"
1168	})
1169	defer cleanupC()
1170
1171	// Force an allocation onto the node
1172	expected := "Hello from the other side"
1173	a := mock.Alloc()
1174	a.Job.Type = structs.JobTypeBatch
1175	a.NodeID = c.NodeID()
1176	a.Job.TaskGroups[0].Count = 1
1177	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
1178		Name:   "web",
1179		Driver: "mock_driver",
1180		Config: map[string]interface{}{
1181			"run_for":       "2s",
1182			"stdout_string": expected,
1183		},
1184		LogConfig: structs.DefaultLogConfig(),
1185		Resources: &structs.Resources{
1186			CPU:      500,
1187			MemoryMB: 256,
1188		},
1189	}
1190
1191	// Wait for the client to connect
1192	testutil.WaitForResult(func() (bool, error) {
1193		nodes := s2.connectedNodes()
1194		return len(nodes) == 1, nil
1195	}, func(err error) {
1196		t.Fatalf("should have a client")
1197	})
1198
1199	// Upsert the allocation
1200	state2 := s2.State()
1201	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1202	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1203
1204	// Wait for the client to run the allocation
1205	testutil.WaitForResult(func() (bool, error) {
1206		alloc, err := state2.AllocByID(nil, a.ID)
1207		if err != nil {
1208			return false, err
1209		}
1210		if alloc == nil {
1211			return false, fmt.Errorf("unknown alloc")
1212		}
1213		if alloc.ClientStatus != structs.AllocClientStatusComplete {
1214			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
1215		}
1216
1217		return true, nil
1218	}, func(err error) {
1219		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
1220	})
1221
1222	// Force remove the connection locally in case it exists
1223	s1.nodeConnsLock.Lock()
1224	delete(s1.nodeConns, c.NodeID())
1225	s1.nodeConnsLock.Unlock()
1226
1227	// Make the request
1228	req := &cstructs.FsStreamRequest{
1229		AllocID:      a.ID,
1230		Path:         "alloc/logs/web.stdout.0",
1231		Origin:       "start",
1232		PlainText:    true,
1233		QueryOptions: structs.QueryOptions{Region: "two"},
1234	}
1235
1236	// Get the handler
1237	handler, err := s1.StreamingRpcHandler("FileSystem.Stream")
1238	require.Nil(err)
1239
1240	// Create a pipe
1241	p1, p2 := net.Pipe()
1242	defer p1.Close()
1243	defer p2.Close()
1244
1245	errCh := make(chan error)
1246	streamMsg := make(chan *cstructs.StreamErrWrapper)
1247
1248	// Start the handler
1249	go handler(p2)
1250
1251	// Start the decoder
1252	go func() {
1253		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1254		for {
1255			var msg cstructs.StreamErrWrapper
1256			if err := decoder.Decode(&msg); err != nil {
1257				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1258					return
1259				}
1260				errCh <- fmt.Errorf("error decoding: %v", err)
1261			}
1262
1263			streamMsg <- &msg
1264		}
1265	}()
1266
1267	// Send the request
1268	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1269	require.Nil(encoder.Encode(req))
1270
1271	timeout := time.After(3 * time.Second)
1272	received := ""
1273OUTER:
1274	for {
1275		select {
1276		case <-timeout:
1277			t.Fatal("timeout")
1278		case err := <-errCh:
1279			t.Fatal(err)
1280		case msg := <-streamMsg:
1281			if msg.Error != nil {
1282				t.Fatalf("Got error: %v", msg.Error.Error())
1283			}
1284
1285			// Add the payload
1286			received += string(msg.Payload)
1287			if received == expected {
1288				break OUTER
1289			}
1290		}
1291	}
1292}
1293
1294func TestClientFS_Logs_NoAlloc(t *testing.T) {
1295	t.Parallel()
1296	require := require.New(t)
1297
1298	// Start a server and client
1299	s, cleanupS := TestServer(t, nil)
1300	defer cleanupS()
1301	testutil.WaitForLeader(t, s.RPC)
1302
1303	// Make the request with bad allocation id
1304	req := &cstructs.FsLogsRequest{
1305		AllocID:      uuid.Generate(),
1306		QueryOptions: structs.QueryOptions{Region: "global"},
1307	}
1308
1309	// Get the handler
1310	handler, err := s.StreamingRpcHandler("FileSystem.Logs")
1311	require.Nil(err)
1312
1313	// Create a pipe
1314	p1, p2 := net.Pipe()
1315	defer p1.Close()
1316	defer p2.Close()
1317
1318	errCh := make(chan error)
1319	streamMsg := make(chan *cstructs.StreamErrWrapper)
1320
1321	// Start the handler
1322	go handler(p2)
1323
1324	// Start the decoder
1325	go func() {
1326		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1327		for {
1328			var msg cstructs.StreamErrWrapper
1329			if err := decoder.Decode(&msg); err != nil {
1330				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1331					return
1332				}
1333				errCh <- fmt.Errorf("error decoding: %v", err)
1334			}
1335
1336			streamMsg <- &msg
1337		}
1338	}()
1339
1340	// Send the request
1341	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1342	require.Nil(encoder.Encode(req))
1343
1344	timeout := time.After(5 * time.Second)
1345
1346OUTER:
1347	for {
1348		select {
1349		case <-timeout:
1350			t.Fatal("timeout")
1351		case err := <-errCh:
1352			t.Fatal(err)
1353		case msg := <-streamMsg:
1354			if msg.Error == nil {
1355				continue
1356			}
1357
1358			if structs.IsErrUnknownAllocation(msg.Error) {
1359				break OUTER
1360			}
1361		}
1362	}
1363}
1364
1365func TestClientFS_Logs_OldNode(t *testing.T) {
1366	t.Parallel()
1367	require := require.New(t)
1368
1369	// Start a server
1370	s, cleanupS := TestServer(t, nil)
1371	defer cleanupS()
1372	state := s.State()
1373	testutil.WaitForLeader(t, s.RPC)
1374
1375	// Test for an old version error
1376	node := mock.Node()
1377	node.Attributes["nomad.version"] = "0.7.1"
1378	require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1005, node))
1379
1380	alloc := mock.Alloc()
1381	alloc.NodeID = node.ID
1382	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1006, []*structs.Allocation{alloc}))
1383
1384	req := &cstructs.FsLogsRequest{
1385		AllocID:      alloc.ID,
1386		QueryOptions: structs.QueryOptions{Region: "global"},
1387	}
1388
1389	// Get the handler
1390	handler, err := s.StreamingRpcHandler("FileSystem.Logs")
1391	require.Nil(err)
1392
1393	// Create a pipe
1394	p1, p2 := net.Pipe()
1395	defer p1.Close()
1396	defer p2.Close()
1397
1398	errCh := make(chan error)
1399	streamMsg := make(chan *cstructs.StreamErrWrapper)
1400
1401	// Start the handler
1402	go handler(p2)
1403
1404	// Start the decoder
1405	go func() {
1406		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1407		for {
1408			var msg cstructs.StreamErrWrapper
1409			if err := decoder.Decode(&msg); err != nil {
1410				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1411					return
1412				}
1413				errCh <- fmt.Errorf("error decoding: %v", err)
1414			}
1415
1416			streamMsg <- &msg
1417		}
1418	}()
1419
1420	// Send the request
1421	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1422	require.Nil(encoder.Encode(req))
1423
1424	timeout := time.After(5 * time.Second)
1425
1426OUTER:
1427	for {
1428		select {
1429		case <-timeout:
1430			t.Fatal("timeout")
1431		case err := <-errCh:
1432			t.Fatal(err)
1433		case msg := <-streamMsg:
1434			if msg.Error == nil {
1435				continue
1436			}
1437
1438			if structs.IsErrNodeLacksRpc(msg.Error) {
1439				break OUTER
1440			}
1441		}
1442	}
1443}
1444
1445func TestClientFS_Logs_ACL(t *testing.T) {
1446	t.Parallel()
1447
1448	// Start a server
1449	s, root, cleanupS := TestACLServer(t, nil)
1450	defer cleanupS()
1451	testutil.WaitForLeader(t, s.RPC)
1452
1453	// Create a bad token
1454	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
1455	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
1456
1457	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
1458		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
1459	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
1460
1461	// Upsert the allocation
1462	state := s.State()
1463	alloc := mock.Alloc()
1464	require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1010, alloc.Job))
1465	require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1011, []*structs.Allocation{alloc}))
1466
1467	cases := []struct {
1468		Name          string
1469		Token         string
1470		ExpectedError string
1471	}{
1472		{
1473			Name:          "bad token",
1474			Token:         tokenBad.SecretID,
1475			ExpectedError: structs.ErrPermissionDenied.Error(),
1476		},
1477		{
1478			Name:          "good token",
1479			Token:         tokenGood.SecretID,
1480			ExpectedError: structs.ErrUnknownNodePrefix,
1481		},
1482		{
1483			Name:          "root token",
1484			Token:         root.SecretID,
1485			ExpectedError: structs.ErrUnknownNodePrefix,
1486		},
1487	}
1488
1489	for _, c := range cases {
1490		t.Run(c.Name, func(t *testing.T) {
1491			// Make the request with bad allocation id
1492			req := &cstructs.FsLogsRequest{
1493				AllocID: alloc.ID,
1494				QueryOptions: structs.QueryOptions{
1495					Namespace: structs.DefaultNamespace,
1496					Region:    "global",
1497					AuthToken: c.Token,
1498				},
1499			}
1500
1501			// Get the handler
1502			handler, err := s.StreamingRpcHandler("FileSystem.Logs")
1503			require.NoError(t, err)
1504
1505			// Create a pipe
1506			p1, p2 := net.Pipe()
1507			defer p1.Close()
1508			defer p2.Close()
1509
1510			errCh := make(chan error)
1511			streamMsg := make(chan *cstructs.StreamErrWrapper)
1512
1513			// Start the handler
1514			go handler(p2)
1515
1516			// Start the decoder
1517			go func() {
1518				decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1519				for {
1520					var msg cstructs.StreamErrWrapper
1521					if err := decoder.Decode(&msg); err != nil {
1522						if err == io.EOF || strings.Contains(err.Error(), "closed") {
1523							return
1524						}
1525						errCh <- fmt.Errorf("error decoding: %v", err)
1526					}
1527
1528					streamMsg <- &msg
1529				}
1530			}()
1531
1532			// Send the request
1533			encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1534			require.NoError(t, encoder.Encode(req))
1535
1536			timeout := time.After(5 * time.Second)
1537
1538		OUTER:
1539			for {
1540				select {
1541				case <-timeout:
1542					t.Fatal("timeout")
1543				case err := <-errCh:
1544					t.Fatal(err)
1545				case msg := <-streamMsg:
1546					if msg.Error == nil {
1547						continue
1548					}
1549
1550					if strings.Contains(msg.Error.Error(), c.ExpectedError) {
1551						break OUTER
1552					} else {
1553						t.Fatalf("Bad error: %v", msg.Error)
1554					}
1555				}
1556			}
1557		})
1558	}
1559}
1560
1561func TestClientFS_Logs_Local(t *testing.T) {
1562	t.Parallel()
1563	require := require.New(t)
1564
1565	// Start a server and client
1566	s, cleanupS := TestServer(t, nil)
1567	defer cleanupS()
1568	testutil.WaitForLeader(t, s.RPC)
1569
1570	c, cleanupC := client.TestClient(t, func(c *config.Config) {
1571		c.Servers = []string{s.config.RPCAddr.String()}
1572	})
1573	defer cleanupC()
1574
1575	// Force an allocation onto the node
1576	expected := "Hello from the other side"
1577	a := mock.Alloc()
1578	a.Job.Type = structs.JobTypeBatch
1579	a.NodeID = c.NodeID()
1580	a.Job.TaskGroups[0].Count = 1
1581	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
1582		Name:   "web",
1583		Driver: "mock_driver",
1584		Config: map[string]interface{}{
1585			"run_for":       "2s",
1586			"stdout_string": expected,
1587		},
1588		LogConfig: structs.DefaultLogConfig(),
1589		Resources: &structs.Resources{
1590			CPU:      500,
1591			MemoryMB: 256,
1592		},
1593	}
1594
1595	// Wait for the client to connect
1596	testutil.WaitForResult(func() (bool, error) {
1597		nodes := s.connectedNodes()
1598		return len(nodes) == 1, nil
1599	}, func(err error) {
1600		t.Fatalf("should have a clients")
1601	})
1602
1603	// Upsert the allocation
1604	state := s.State()
1605	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1606	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1607
1608	// Wait for the client to run the allocation
1609	testutil.WaitForResult(func() (bool, error) {
1610		alloc, err := state.AllocByID(nil, a.ID)
1611		if err != nil {
1612			return false, err
1613		}
1614		if alloc == nil {
1615			return false, fmt.Errorf("unknown alloc")
1616		}
1617		if alloc.ClientStatus != structs.AllocClientStatusComplete {
1618			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
1619		}
1620
1621		return true, nil
1622	}, func(err error) {
1623		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
1624	})
1625
1626	// Make the request
1627	req := &cstructs.FsLogsRequest{
1628		AllocID:      a.ID,
1629		Task:         a.Job.TaskGroups[0].Tasks[0].Name,
1630		LogType:      "stdout",
1631		Origin:       "start",
1632		PlainText:    true,
1633		QueryOptions: structs.QueryOptions{Region: "global"},
1634	}
1635
1636	// Get the handler
1637	handler, err := s.StreamingRpcHandler("FileSystem.Logs")
1638	require.Nil(err)
1639
1640	// Create a pipe
1641	p1, p2 := net.Pipe()
1642	defer p1.Close()
1643	defer p2.Close()
1644
1645	errCh := make(chan error)
1646	streamMsg := make(chan *cstructs.StreamErrWrapper)
1647
1648	// Start the handler
1649	go handler(p2)
1650
1651	// Start the decoder
1652	go func() {
1653		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1654		for {
1655			var msg cstructs.StreamErrWrapper
1656			if err := decoder.Decode(&msg); err != nil {
1657				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1658					return
1659				}
1660				errCh <- fmt.Errorf("error decoding: %v", err)
1661			}
1662
1663			streamMsg <- &msg
1664		}
1665	}()
1666
1667	// Send the request
1668	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1669	require.Nil(encoder.Encode(req))
1670
1671	timeout := time.After(3 * time.Second)
1672	received := ""
1673OUTER:
1674	for {
1675		select {
1676		case <-timeout:
1677			t.Fatal("timeout")
1678		case err := <-errCh:
1679			t.Fatal(err)
1680		case msg := <-streamMsg:
1681			if msg.Error != nil {
1682				t.Fatalf("Got error: %v", msg.Error.Error())
1683			}
1684
1685			// Add the payload
1686			received += string(msg.Payload)
1687			if received == expected {
1688				break OUTER
1689			}
1690		}
1691	}
1692}
1693
1694func TestClientFS_Logs_Local_Follow(t *testing.T) {
1695	t.Parallel()
1696	require := require.New(t)
1697
1698	// Start a server and client
1699	s, cleanupS := TestServer(t, nil)
1700	defer cleanupS()
1701	testutil.WaitForLeader(t, s.RPC)
1702
1703	c, cleanup := client.TestClient(t, func(c *config.Config) {
1704		c.Servers = []string{s.config.RPCAddr.String()}
1705	})
1706	defer cleanup()
1707
1708	// Force an allocation onto the node
1709	expectedBase := "Hello from the other side"
1710	repeat := 10
1711
1712	a := mock.Alloc()
1713	a.Job.Type = structs.JobTypeBatch
1714	a.NodeID = c.NodeID()
1715	a.Job.TaskGroups[0].Count = 1
1716	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
1717		Name:   "web",
1718		Driver: "mock_driver",
1719		Config: map[string]interface{}{
1720			"run_for":                "20s",
1721			"stdout_string":          expectedBase,
1722			"stdout_repeat":          repeat,
1723			"stdout_repeat_duration": "200ms",
1724		},
1725		LogConfig: structs.DefaultLogConfig(),
1726		Resources: &structs.Resources{
1727			CPU:      500,
1728			MemoryMB: 256,
1729		},
1730	}
1731
1732	// Wait for the client to connect
1733	testutil.WaitForResult(func() (bool, error) {
1734		nodes := s.connectedNodes()
1735		return len(nodes) == 1, nil
1736	}, func(err error) {
1737		t.Fatalf("should have a clients")
1738	})
1739
1740	// Upsert the allocation
1741	state := s.State()
1742	require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1743	require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1744
1745	// Wait for the client to run the allocation
1746	testutil.WaitForResult(func() (bool, error) {
1747		alloc, err := state.AllocByID(nil, a.ID)
1748		if err != nil {
1749			return false, err
1750		}
1751		if alloc == nil {
1752			return false, fmt.Errorf("unknown alloc")
1753		}
1754		if alloc.ClientStatus != structs.AllocClientStatusRunning {
1755			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
1756		}
1757
1758		return true, nil
1759	}, func(err error) {
1760		t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err)
1761	})
1762
1763	// Make the request
1764	req := &cstructs.FsLogsRequest{
1765		AllocID:      a.ID,
1766		Task:         a.Job.TaskGroups[0].Tasks[0].Name,
1767		LogType:      "stdout",
1768		Origin:       "start",
1769		PlainText:    true,
1770		Follow:       true,
1771		QueryOptions: structs.QueryOptions{Region: "global"},
1772	}
1773
1774	// Get the handler
1775	handler, err := s.StreamingRpcHandler("FileSystem.Logs")
1776	require.Nil(err)
1777
1778	// Create a pipe
1779	p1, p2 := net.Pipe()
1780	defer p1.Close()
1781	defer p2.Close()
1782
1783	errCh := make(chan error)
1784	streamMsg := make(chan *cstructs.StreamErrWrapper)
1785
1786	// Start the handler
1787	go handler(p2)
1788
1789	// Start the decoder
1790	go func() {
1791		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1792		for {
1793			var msg cstructs.StreamErrWrapper
1794			if err := decoder.Decode(&msg); err != nil {
1795				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1796					return
1797				}
1798				errCh <- fmt.Errorf("error decoding: %v", err)
1799			}
1800
1801			streamMsg <- &msg
1802		}
1803	}()
1804
1805	// Send the request
1806	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1807	require.Nil(encoder.Encode(req))
1808
1809	timeout := time.After(20 * time.Second)
1810	expected := strings.Repeat(expectedBase, repeat+1)
1811	received := ""
1812OUTER:
1813	for {
1814		select {
1815		case <-timeout:
1816			t.Fatal("timeout")
1817		case err := <-errCh:
1818			t.Fatal(err)
1819		case msg := <-streamMsg:
1820			if msg.Error != nil {
1821				t.Fatalf("Got error: %v", msg.Error.Error())
1822			}
1823
1824			// Add the payload
1825			received += string(msg.Payload)
1826			if received == expected {
1827				break OUTER
1828			}
1829		}
1830	}
1831}
1832
1833func TestClientFS_Logs_Remote_Server(t *testing.T) {
1834	t.Parallel()
1835	require := require.New(t)
1836
1837	// Start a server and client
1838	s1, cleanupS1 := TestServer(t, func(c *Config) {
1839		c.BootstrapExpect = 2
1840	})
1841	defer cleanupS1()
1842	s2, cleanupS2 := TestServer(t, func(c *Config) {
1843		c.BootstrapExpect = 2
1844	})
1845	defer cleanupS2()
1846	TestJoin(t, s1, s2)
1847	testutil.WaitForLeader(t, s1.RPC)
1848	testutil.WaitForLeader(t, s2.RPC)
1849
1850	c, cleanup := client.TestClient(t, func(c *config.Config) {
1851		c.Servers = []string{s2.config.RPCAddr.String()}
1852	})
1853	defer cleanup()
1854
1855	// Force an allocation onto the node
1856	expected := "Hello from the other side"
1857	a := mock.Alloc()
1858	a.Job.Type = structs.JobTypeBatch
1859	a.NodeID = c.NodeID()
1860	a.Job.TaskGroups[0].Count = 1
1861	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
1862		Name:   "web",
1863		Driver: "mock_driver",
1864		Config: map[string]interface{}{
1865			"run_for":       "2s",
1866			"stdout_string": expected,
1867		},
1868		LogConfig: structs.DefaultLogConfig(),
1869		Resources: &structs.Resources{
1870			CPU:      500,
1871			MemoryMB: 256,
1872		},
1873	}
1874
1875	// Wait for the client to connect
1876	testutil.WaitForResult(func() (bool, error) {
1877		nodes := s2.connectedNodes()
1878		return len(nodes) == 1, nil
1879	}, func(err error) {
1880		t.Fatalf("should have a clients")
1881	})
1882
1883	// Upsert the allocation
1884	state1 := s1.State()
1885	state2 := s2.State()
1886	require.Nil(state1.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1887	require.Nil(state1.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1888	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
1889	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
1890
1891	// Wait for the client to run the allocation
1892	testutil.WaitForResult(func() (bool, error) {
1893		alloc, err := state2.AllocByID(nil, a.ID)
1894		if err != nil {
1895			return false, err
1896		}
1897		if alloc == nil {
1898			return false, fmt.Errorf("unknown alloc")
1899		}
1900		if alloc.ClientStatus != structs.AllocClientStatusComplete {
1901			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
1902		}
1903
1904		return true, nil
1905	}, func(err error) {
1906		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
1907	})
1908
1909	// Force remove the connection locally in case it exists
1910	s1.nodeConnsLock.Lock()
1911	delete(s1.nodeConns, c.NodeID())
1912	s1.nodeConnsLock.Unlock()
1913
1914	// Make the request
1915	req := &cstructs.FsLogsRequest{
1916		AllocID:      a.ID,
1917		Task:         a.Job.TaskGroups[0].Tasks[0].Name,
1918		LogType:      "stdout",
1919		Origin:       "start",
1920		PlainText:    true,
1921		QueryOptions: structs.QueryOptions{Region: "global"},
1922	}
1923
1924	// Get the handler
1925	handler, err := s1.StreamingRpcHandler("FileSystem.Logs")
1926	require.Nil(err)
1927
1928	// Create a pipe
1929	p1, p2 := net.Pipe()
1930	defer p1.Close()
1931	defer p2.Close()
1932
1933	errCh := make(chan error)
1934	streamMsg := make(chan *cstructs.StreamErrWrapper)
1935
1936	// Start the handler
1937	go handler(p2)
1938
1939	// Start the decoder
1940	go func() {
1941		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1942		for {
1943			var msg cstructs.StreamErrWrapper
1944			if err := decoder.Decode(&msg); err != nil {
1945				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1946					return
1947				}
1948				errCh <- fmt.Errorf("error decoding: %v", err)
1949			}
1950
1951			streamMsg <- &msg
1952		}
1953	}()
1954
1955	// Send the request
1956	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1957	require.Nil(encoder.Encode(req))
1958
1959	timeout := time.After(3 * time.Second)
1960	received := ""
1961OUTER:
1962	for {
1963		select {
1964		case <-timeout:
1965			t.Fatal("timeout")
1966		case err := <-errCh:
1967			t.Fatal(err)
1968		case msg := <-streamMsg:
1969			if msg.Error != nil {
1970				t.Fatalf("Got error: %v", msg.Error.Error())
1971			}
1972
1973			// Add the payload
1974			received += string(msg.Payload)
1975			if received == expected {
1976				break OUTER
1977			}
1978		}
1979	}
1980}
1981
1982func TestClientFS_Logs_Remote_Region(t *testing.T) {
1983	t.Parallel()
1984	require := require.New(t)
1985
1986	// Start a server and client
1987	s1, cleanupS1 := TestServer(t, nil)
1988	defer cleanupS1()
1989	s2, cleanupS2 := TestServer(t, func(c *Config) {
1990		c.Region = "two"
1991	})
1992	defer cleanupS2()
1993	TestJoin(t, s1, s2)
1994	testutil.WaitForLeader(t, s1.RPC)
1995	testutil.WaitForLeader(t, s2.RPC)
1996
1997	c, cleanup := client.TestClient(t, func(c *config.Config) {
1998		c.Servers = []string{s2.config.RPCAddr.String()}
1999		c.Region = "two"
2000	})
2001	defer cleanup()
2002
2003	// Force an allocation onto the node
2004	expected := "Hello from the other side"
2005	a := mock.Alloc()
2006	a.Job.Type = structs.JobTypeBatch
2007	a.NodeID = c.NodeID()
2008	a.Job.TaskGroups[0].Count = 1
2009	a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
2010		Name:   "web",
2011		Driver: "mock_driver",
2012		Config: map[string]interface{}{
2013			"run_for":       "2s",
2014			"stdout_string": expected,
2015		},
2016		LogConfig: structs.DefaultLogConfig(),
2017		Resources: &structs.Resources{
2018			CPU:      500,
2019			MemoryMB: 256,
2020		},
2021	}
2022
2023	// Wait for the client to connect
2024	testutil.WaitForResult(func() (bool, error) {
2025		nodes := s2.connectedNodes()
2026		return len(nodes) == 1, nil
2027	}, func(err error) {
2028		t.Fatalf("should have a client")
2029	})
2030
2031	// Upsert the allocation
2032	state2 := s2.State()
2033	require.Nil(state2.UpsertJob(structs.MsgTypeTestSetup, 999, a.Job))
2034	require.Nil(state2.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}))
2035
2036	// Wait for the client to run the allocation
2037	testutil.WaitForResult(func() (bool, error) {
2038		alloc, err := state2.AllocByID(nil, a.ID)
2039		if err != nil {
2040			return false, err
2041		}
2042		if alloc == nil {
2043			return false, fmt.Errorf("unknown alloc")
2044		}
2045		if alloc.ClientStatus != structs.AllocClientStatusComplete {
2046			return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
2047		}
2048
2049		return true, nil
2050	}, func(err error) {
2051		t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
2052	})
2053
2054	// Force remove the connection locally in case it exists
2055	s1.nodeConnsLock.Lock()
2056	delete(s1.nodeConns, c.NodeID())
2057	s1.nodeConnsLock.Unlock()
2058
2059	// Make the request
2060	req := &cstructs.FsLogsRequest{
2061		AllocID:      a.ID,
2062		Task:         a.Job.TaskGroups[0].Tasks[0].Name,
2063		LogType:      "stdout",
2064		Origin:       "start",
2065		PlainText:    true,
2066		QueryOptions: structs.QueryOptions{Region: "two"},
2067	}
2068
2069	// Get the handler
2070	handler, err := s1.StreamingRpcHandler("FileSystem.Logs")
2071	require.Nil(err)
2072
2073	// Create a pipe
2074	p1, p2 := net.Pipe()
2075	defer p1.Close()
2076	defer p2.Close()
2077
2078	errCh := make(chan error)
2079	streamMsg := make(chan *cstructs.StreamErrWrapper)
2080
2081	// Start the handler
2082	go handler(p2)
2083
2084	// Start the decoder
2085	go func() {
2086		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
2087		for {
2088			var msg cstructs.StreamErrWrapper
2089			if err := decoder.Decode(&msg); err != nil {
2090				if err == io.EOF || strings.Contains(err.Error(), "closed") {
2091					return
2092				}
2093				errCh <- fmt.Errorf("error decoding: %v", err)
2094			}
2095
2096			streamMsg <- &msg
2097		}
2098	}()
2099
2100	// Send the request
2101	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
2102	require.Nil(encoder.Encode(req))
2103
2104	timeout := time.After(3 * time.Second)
2105	received := ""
2106OUTER:
2107	for {
2108		select {
2109		case <-timeout:
2110			t.Fatal("timeout")
2111		case err := <-errCh:
2112			t.Fatal(err)
2113		case msg := <-streamMsg:
2114			if msg.Error != nil {
2115				t.Fatalf("Got error: %v", msg.Error.Error())
2116			}
2117
2118			// Add the payload
2119			received += string(msg.Payload)
2120			if received == expected {
2121				break OUTER
2122			}
2123		}
2124	}
2125}
2126