1package client
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"io/ioutil"
8	"math"
9	"net"
10	"os"
11	"path/filepath"
12	"reflect"
13	"runtime"
14	"strings"
15	"sync"
16	"testing"
17	"time"
18
19	"github.com/hashicorp/go-msgpack/codec"
20	"github.com/hashicorp/nomad/acl"
21	"github.com/hashicorp/nomad/client/allocdir"
22	"github.com/hashicorp/nomad/client/config"
23	sframer "github.com/hashicorp/nomad/client/lib/streamframer"
24	cstructs "github.com/hashicorp/nomad/client/structs"
25	"github.com/hashicorp/nomad/helper/testlog"
26	"github.com/hashicorp/nomad/helper/uuid"
27	"github.com/hashicorp/nomad/nomad"
28	"github.com/hashicorp/nomad/nomad/mock"
29	"github.com/hashicorp/nomad/nomad/structs"
30	"github.com/hashicorp/nomad/testutil"
31	"github.com/stretchr/testify/require"
32)
33
34// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
35// should destroy the temp dir.
36func tempAllocDir(t testing.TB) *allocdir.AllocDir {
37	dir, err := ioutil.TempDir("", "nomadtest")
38	if err != nil {
39		t.Fatalf("TempDir() failed: %v", err)
40	}
41
42	if err := os.Chmod(dir, 0777); err != nil {
43		t.Fatalf("failed to chmod dir: %v", err)
44	}
45
46	return allocdir.NewAllocDir(testlog.HCLogger(t), dir)
47}
48
49type nopWriteCloser struct {
50	io.Writer
51}
52
53func (n nopWriteCloser) Close() error {
54	return nil
55}
56
57func TestFS_Stat_NoAlloc(t *testing.T) {
58	t.Parallel()
59	require := require.New(t)
60
61	// Start a client
62	c, cleanup := TestClient(t, nil)
63	defer cleanup()
64
65	// Make the request with bad allocation id
66	req := &cstructs.FsStatRequest{
67		AllocID:      uuid.Generate(),
68		Path:         "foo",
69		QueryOptions: structs.QueryOptions{Region: "global"},
70	}
71
72	var resp cstructs.FsStatResponse
73	err := c.ClientRPC("FileSystem.Stat", req, &resp)
74	require.NotNil(err)
75	require.True(structs.IsErrUnknownAllocation(err))
76}
77
78func TestFS_Stat(t *testing.T) {
79	t.Parallel()
80	require := require.New(t)
81
82	// Start a server and client
83	s, cleanupS := nomad.TestServer(t, nil)
84	defer cleanupS()
85	testutil.WaitForLeader(t, s.RPC)
86
87	c, cleanupC := TestClient(t, func(c *config.Config) {
88		c.Servers = []string{s.GetConfig().RPCAddr.String()}
89	})
90	defer cleanupC()
91
92	// Create and add an alloc
93	job := mock.BatchJob()
94	job.TaskGroups[0].Count = 1
95	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
96		"run_for": "500ms",
97	}
98	// Wait for alloc to be running
99	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
100
101	// Make the request
102	req := &cstructs.FsStatRequest{
103		AllocID:      alloc.ID,
104		Path:         "/",
105		QueryOptions: structs.QueryOptions{Region: "global"},
106	}
107
108	var resp cstructs.FsStatResponse
109	err := c.ClientRPC("FileSystem.Stat", req, &resp)
110	require.Nil(err)
111	require.NotNil(resp.Info)
112	require.True(resp.Info.IsDir)
113}
114
115func TestFS_Stat_ACL(t *testing.T) {
116	t.Parallel()
117
118	// Start a server
119	s, root, cleanupS := nomad.TestACLServer(t, nil)
120	defer cleanupS()
121	testutil.WaitForLeader(t, s.RPC)
122
123	client, cleanup := TestClient(t, func(c *config.Config) {
124		c.ACLEnabled = true
125		c.Servers = []string{s.GetConfig().RPCAddr.String()}
126	})
127	defer cleanup()
128
129	// Create a bad token
130	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
131	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
132
133	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
134		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
135	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
136
137	job := mock.BatchJob()
138	job.TaskGroups[0].Count = 1
139	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
140		"run_for": "20s",
141	}
142
143	// Wait for client to be running job
144	alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
145
146	cases := []struct {
147		Name          string
148		Token         string
149		ExpectedError string
150	}{
151		{
152			Name:          "bad token",
153			Token:         tokenBad.SecretID,
154			ExpectedError: structs.ErrPermissionDenied.Error(),
155		},
156		{
157			Name:  "good token",
158			Token: tokenGood.SecretID,
159		},
160		{
161			Name:  "root token",
162			Token: root.SecretID,
163		},
164	}
165
166	for _, c := range cases {
167		t.Run(c.Name, func(t *testing.T) {
168			req := &cstructs.FsStatRequest{
169				AllocID: alloc.ID,
170				Path:    "/",
171				QueryOptions: structs.QueryOptions{
172					Region:    "global",
173					AuthToken: c.Token,
174					Namespace: structs.DefaultNamespace,
175				},
176			}
177
178			var resp cstructs.FsStatResponse
179			err := client.ClientRPC("FileSystem.Stat", req, &resp)
180			if c.ExpectedError == "" {
181				require.NoError(t, err)
182			} else {
183				require.NotNil(t, err)
184				require.Contains(t, err.Error(), c.ExpectedError)
185			}
186		})
187	}
188}
189
190func TestFS_List_NoAlloc(t *testing.T) {
191	t.Parallel()
192	require := require.New(t)
193
194	// Start a client
195	c, cleanup := TestClient(t, nil)
196	defer cleanup()
197
198	// Make the request with bad allocation id
199	req := &cstructs.FsListRequest{
200		AllocID:      uuid.Generate(),
201		Path:         "foo",
202		QueryOptions: structs.QueryOptions{Region: "global"},
203	}
204
205	var resp cstructs.FsListResponse
206	err := c.ClientRPC("FileSystem.List", req, &resp)
207	require.NotNil(err)
208	require.True(structs.IsErrUnknownAllocation(err))
209}
210
211func TestFS_List(t *testing.T) {
212	t.Parallel()
213	require := require.New(t)
214
215	// Start a server and client
216	s, cleanupS := nomad.TestServer(t, nil)
217	defer cleanupS()
218	testutil.WaitForLeader(t, s.RPC)
219
220	c, cleanupC := TestClient(t, func(c *config.Config) {
221		c.Servers = []string{s.GetConfig().RPCAddr.String()}
222	})
223	defer cleanupC()
224
225	// Create and add an alloc
226	job := mock.BatchJob()
227	job.TaskGroups[0].Count = 1
228	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
229		"run_for": "500ms",
230	}
231	// Wait for alloc to be running
232	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
233
234	// Make the request
235	req := &cstructs.FsListRequest{
236		AllocID:      alloc.ID,
237		Path:         "/",
238		QueryOptions: structs.QueryOptions{Region: "global"},
239	}
240
241	var resp cstructs.FsListResponse
242	err := c.ClientRPC("FileSystem.List", req, &resp)
243	require.Nil(err)
244	require.NotEmpty(resp.Files)
245	require.True(resp.Files[0].IsDir)
246}
247
248func TestFS_List_ACL(t *testing.T) {
249	t.Parallel()
250
251	// Start a server
252	s, root, cleanupS := nomad.TestACLServer(t, nil)
253	defer cleanupS()
254	testutil.WaitForLeader(t, s.RPC)
255
256	client, cleanup := TestClient(t, func(c *config.Config) {
257		c.ACLEnabled = true
258		c.Servers = []string{s.GetConfig().RPCAddr.String()}
259	})
260	defer cleanup()
261
262	// Create a bad token
263	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
264	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
265
266	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
267		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
268	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
269
270	job := mock.BatchJob()
271	job.TaskGroups[0].Count = 1
272	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
273		"run_for": "20s",
274	}
275
276	// Wait for client to be running job
277	alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
278
279	cases := []struct {
280		Name          string
281		Token         string
282		ExpectedError string
283	}{
284		{
285			Name:          "bad token",
286			Token:         tokenBad.SecretID,
287			ExpectedError: structs.ErrPermissionDenied.Error(),
288		},
289		{
290			Name:  "good token",
291			Token: tokenGood.SecretID,
292		},
293		{
294			Name:  "root token",
295			Token: root.SecretID,
296		},
297	}
298
299	for _, c := range cases {
300		t.Run(c.Name, func(t *testing.T) {
301			// Make the request with bad allocation id
302			req := &cstructs.FsListRequest{
303				AllocID: alloc.ID,
304				Path:    "/",
305				QueryOptions: structs.QueryOptions{
306					Region:    "global",
307					AuthToken: c.Token,
308					Namespace: structs.DefaultNamespace,
309				},
310			}
311
312			var resp cstructs.FsListResponse
313			err := client.ClientRPC("FileSystem.List", req, &resp)
314			if c.ExpectedError == "" {
315				require.NoError(t, err)
316			} else {
317				require.EqualError(t, err, c.ExpectedError)
318			}
319		})
320	}
321}
322
323func TestFS_Stream_NoAlloc(t *testing.T) {
324	t.Parallel()
325	require := require.New(t)
326
327	// Start a client
328	c, cleanup := TestClient(t, nil)
329	defer cleanup()
330
331	// Make the request with bad allocation id
332	req := &cstructs.FsStreamRequest{
333		AllocID:      uuid.Generate(),
334		Path:         "foo",
335		Origin:       "start",
336		QueryOptions: structs.QueryOptions{Region: "global"},
337	}
338
339	// Get the handler
340	handler, err := c.StreamingRpcHandler("FileSystem.Stream")
341	require.Nil(err)
342
343	// Create a pipe
344	p1, p2 := net.Pipe()
345	defer p1.Close()
346	defer p2.Close()
347
348	errCh := make(chan error)
349	streamMsg := make(chan *cstructs.StreamErrWrapper)
350
351	// Start the handler
352	go handler(p2)
353
354	// Start the decoder
355	go func() {
356		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
357		for {
358			var msg cstructs.StreamErrWrapper
359			if err := decoder.Decode(&msg); err != nil {
360				if err == io.EOF || strings.Contains(err.Error(), "closed") {
361					return
362				}
363				errCh <- fmt.Errorf("error decoding: %v", err)
364			}
365
366			streamMsg <- &msg
367		}
368	}()
369
370	// Send the request
371	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
372	require.Nil(encoder.Encode(req))
373
374	timeout := time.After(3 * time.Second)
375
376OUTER:
377	for {
378		select {
379		case <-timeout:
380			t.Fatal("timeout")
381		case err := <-errCh:
382			t.Fatal(err)
383		case msg := <-streamMsg:
384			t.Logf("Got msg %+v", msg)
385			if msg.Error == nil {
386				continue
387			}
388
389			if structs.IsErrUnknownAllocation(msg.Error) {
390				break OUTER
391			} else {
392				t.Fatalf("bad error: %v", err)
393			}
394		}
395	}
396}
397
398func TestFS_Stream_ACL(t *testing.T) {
399	t.Parallel()
400
401	// Start a server
402	s, root, cleanupS := nomad.TestACLServer(t, nil)
403	defer cleanupS()
404	testutil.WaitForLeader(t, s.RPC)
405
406	client, cleanup := TestClient(t, func(c *config.Config) {
407		c.ACLEnabled = true
408		c.Servers = []string{s.GetConfig().RPCAddr.String()}
409	})
410	defer cleanup()
411
412	// Create a bad token
413	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
414	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
415
416	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
417		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
418	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
419
420	job := mock.BatchJob()
421	job.TaskGroups[0].Count = 1
422	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
423		"run_for": "20s",
424	}
425
426	// Wait for client to be running job
427	alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
428
429	cases := []struct {
430		Name          string
431		Token         string
432		ExpectedError string
433	}{
434		{
435			Name:          "bad token",
436			Token:         tokenBad.SecretID,
437			ExpectedError: structs.ErrPermissionDenied.Error(),
438		},
439		{
440			Name:  "good token",
441			Token: tokenGood.SecretID,
442		},
443		{
444			Name:  "root token",
445			Token: root.SecretID,
446		},
447	}
448
449	for _, c := range cases {
450		t.Run(c.Name, func(t *testing.T) {
451			// Make the request with bad allocation id
452			req := &cstructs.FsStreamRequest{
453				AllocID: alloc.ID,
454				Path:    "foo",
455				Origin:  "start",
456				QueryOptions: structs.QueryOptions{
457					Namespace: structs.DefaultNamespace,
458					Region:    "global",
459					AuthToken: c.Token,
460				},
461			}
462
463			// Get the handler
464			handler, err := client.StreamingRpcHandler("FileSystem.Stream")
465			require.Nil(t, err)
466
467			// Create a pipe
468			p1, p2 := net.Pipe()
469			defer p1.Close()
470			defer p2.Close()
471
472			errCh := make(chan error)
473			streamMsg := make(chan *cstructs.StreamErrWrapper)
474
475			// Start the handler
476			go handler(p2)
477
478			// Start the decoder
479			go func() {
480				decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
481				for {
482					var msg cstructs.StreamErrWrapper
483					if err := decoder.Decode(&msg); err != nil {
484						errCh <- err
485						return
486					}
487
488					streamMsg <- &msg
489				}
490			}()
491
492			// Send the request
493			encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
494			require.NoError(t, encoder.Encode(req))
495
496			timeout := time.After(5 * time.Second)
497
498		OUTER:
499			for {
500				select {
501				case <-timeout:
502					t.Fatal("timeout")
503				case err := <-errCh:
504					eof := err == io.EOF || strings.Contains(err.Error(), "closed")
505					if c.ExpectedError == "" && eof {
506						// No error was expected!
507						return
508					}
509					t.Fatal(err)
510				case msg := <-streamMsg:
511					if msg.Error == nil {
512						continue
513					}
514
515					if strings.Contains(msg.Error.Error(), c.ExpectedError) {
516						break OUTER
517					} else {
518						t.Fatalf("Bad error: %v", msg.Error)
519					}
520				}
521			}
522		})
523	}
524}
525
526func TestFS_Stream(t *testing.T) {
527	t.Parallel()
528	require := require.New(t)
529
530	// Start a server and client
531	s, cleanupS := nomad.TestServer(t, nil)
532	defer cleanupS()
533	testutil.WaitForLeader(t, s.RPC)
534
535	c, cleanupC := TestClient(t, func(c *config.Config) {
536		c.Servers = []string{s.GetConfig().RPCAddr.String()}
537	})
538	defer cleanupC()
539
540	expected := "Hello from the other side"
541	job := mock.BatchJob()
542	job.TaskGroups[0].Count = 1
543	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
544		"run_for":       "2s",
545		"stdout_string": expected,
546	}
547
548	// Wait for alloc to be running
549	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
550
551	// Make the request
552	req := &cstructs.FsStreamRequest{
553		AllocID:      alloc.ID,
554		Path:         "alloc/logs/web.stdout.0",
555		PlainText:    true,
556		QueryOptions: structs.QueryOptions{Region: "global"},
557	}
558
559	// Get the handler
560	handler, err := c.StreamingRpcHandler("FileSystem.Stream")
561	require.Nil(err)
562
563	// Create a pipe
564	p1, p2 := net.Pipe()
565	defer p1.Close()
566	defer p2.Close()
567
568	// Wrap the pipe so we can check it is closed
569	pipeChecker := &ReadWriteCloseChecker{ReadWriteCloser: p2}
570
571	errCh := make(chan error)
572	streamMsg := make(chan *cstructs.StreamErrWrapper)
573
574	// Start the handler
575	go handler(pipeChecker)
576
577	// Start the decoder
578	go func() {
579		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
580		for {
581			var msg cstructs.StreamErrWrapper
582			if err := decoder.Decode(&msg); err != nil {
583				if err == io.EOF || strings.Contains(err.Error(), "closed") {
584					return
585				}
586				errCh <- fmt.Errorf("error decoding: %v", err)
587			}
588
589			streamMsg <- &msg
590		}
591	}()
592
593	// Send the request
594	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
595	require.Nil(encoder.Encode(req))
596
597	timeout := time.After(3 * time.Second)
598	received := ""
599OUTER:
600	for {
601		select {
602		case <-timeout:
603			t.Fatal("timeout")
604		case err := <-errCh:
605			t.Fatal(err)
606		case msg := <-streamMsg:
607			if msg.Error != nil {
608				t.Fatalf("Got error: %v", msg.Error.Error())
609			}
610
611			// Add the payload
612			received += string(msg.Payload)
613			if received == expected {
614				break OUTER
615			}
616		}
617	}
618
619	testutil.WaitForResult(func() (bool, error) {
620		pipeChecker.l.Lock()
621		defer pipeChecker.l.Unlock()
622
623		return pipeChecker.Closed, nil
624	}, func(err error) {
625		t.Fatal("Pipe not closed")
626	})
627}
628
629type ReadWriteCloseChecker struct {
630	io.ReadWriteCloser
631	l      sync.Mutex
632	Closed bool
633}
634
635func (r *ReadWriteCloseChecker) Close() error {
636	r.l.Lock()
637	r.Closed = true
638	r.l.Unlock()
639	return r.ReadWriteCloser.Close()
640}
641
642func TestFS_Stream_Follow(t *testing.T) {
643	t.Parallel()
644	require := require.New(t)
645
646	// Start a server and client
647	s, cleanupS := nomad.TestServer(t, nil)
648	defer cleanupS()
649	testutil.WaitForLeader(t, s.RPC)
650
651	c, cleanupC := TestClient(t, func(c *config.Config) {
652		c.Servers = []string{s.GetConfig().RPCAddr.String()}
653	})
654	defer cleanupC()
655
656	expectedBase := "Hello from the other side"
657	repeat := 10
658
659	job := mock.BatchJob()
660	job.TaskGroups[0].Count = 1
661	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
662		"run_for":                "20s",
663		"stdout_string":          expectedBase,
664		"stdout_repeat":          repeat,
665		"stdout_repeat_duration": "200ms",
666	}
667
668	// Wait for alloc to be running
669	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
670
671	// Make the request
672	req := &cstructs.FsStreamRequest{
673		AllocID:      alloc.ID,
674		Path:         "alloc/logs/web.stdout.0",
675		PlainText:    true,
676		Follow:       true,
677		QueryOptions: structs.QueryOptions{Region: "global"},
678	}
679
680	// Get the handler
681	handler, err := c.StreamingRpcHandler("FileSystem.Stream")
682	require.Nil(err)
683
684	// Create a pipe
685	p1, p2 := net.Pipe()
686	defer p1.Close()
687	defer p2.Close()
688
689	errCh := make(chan error)
690	streamMsg := make(chan *cstructs.StreamErrWrapper)
691
692	// Start the handler
693	go handler(p2)
694
695	// Start the decoder
696	go func() {
697		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
698		for {
699			var msg cstructs.StreamErrWrapper
700			if err := decoder.Decode(&msg); err != nil {
701				if err == io.EOF || strings.Contains(err.Error(), "closed") {
702					return
703				}
704				errCh <- fmt.Errorf("error decoding: %v", err)
705			}
706
707			streamMsg <- &msg
708		}
709	}()
710
711	// Send the request
712	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
713	require.Nil(encoder.Encode(req))
714
715	timeout := time.After(20 * time.Second)
716	expected := strings.Repeat(expectedBase, repeat+1)
717	received := ""
718OUTER:
719	for {
720		select {
721		case <-timeout:
722			t.Fatal("timeout")
723		case err := <-errCh:
724			t.Fatal(err)
725		case msg := <-streamMsg:
726			if msg.Error != nil {
727				t.Fatalf("Got error: %v", msg.Error.Error())
728			}
729
730			// Add the payload
731			received += string(msg.Payload)
732			if received == expected {
733				break OUTER
734			}
735		}
736	}
737}
738
739func TestFS_Stream_Limit(t *testing.T) {
740	t.Parallel()
741	require := require.New(t)
742
743	// Start a server and client
744	s, cleanupS := nomad.TestServer(t, nil)
745	defer cleanupS()
746	testutil.WaitForLeader(t, s.RPC)
747
748	c, cleanup := TestClient(t, func(c *config.Config) {
749		c.Servers = []string{s.GetConfig().RPCAddr.String()}
750	})
751	defer cleanup()
752
753	var limit int64 = 5
754	full := "Hello from the other side"
755	expected := full[:limit]
756	job := mock.BatchJob()
757	job.TaskGroups[0].Count = 1
758	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
759		"run_for":       "2s",
760		"stdout_string": full,
761	}
762
763	// Wait for alloc to be running
764	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
765
766	// Make the request
767	req := &cstructs.FsStreamRequest{
768		AllocID:      alloc.ID,
769		Path:         "alloc/logs/web.stdout.0",
770		PlainText:    true,
771		Limit:        limit,
772		QueryOptions: structs.QueryOptions{Region: "global"},
773	}
774
775	// Get the handler
776	handler, err := c.StreamingRpcHandler("FileSystem.Stream")
777	require.Nil(err)
778
779	// Create a pipe
780	p1, p2 := net.Pipe()
781	defer p1.Close()
782	defer p2.Close()
783
784	errCh := make(chan error)
785	streamMsg := make(chan *cstructs.StreamErrWrapper)
786
787	// Start the handler
788	go handler(p2)
789
790	// Start the decoder
791	go func() {
792		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
793		for {
794			var msg cstructs.StreamErrWrapper
795			if err := decoder.Decode(&msg); err != nil {
796				if err == io.EOF || strings.Contains(err.Error(), "closed") {
797					return
798				}
799				errCh <- fmt.Errorf("error decoding: %v", err)
800			}
801
802			streamMsg <- &msg
803		}
804	}()
805
806	// Send the request
807	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
808	require.Nil(encoder.Encode(req))
809
810	timeout := time.After(3 * time.Second)
811	received := ""
812OUTER:
813	for {
814		select {
815		case <-timeout:
816			t.Fatal("timeout")
817		case err := <-errCh:
818			t.Fatal(err)
819		case msg := <-streamMsg:
820			if msg.Error != nil {
821				t.Fatalf("Got error: %v", msg.Error.Error())
822			}
823
824			// Add the payload
825			received += string(msg.Payload)
826			if received == expected {
827				break OUTER
828			}
829		}
830	}
831}
832
833func TestFS_Logs_NoAlloc(t *testing.T) {
834	t.Parallel()
835	require := require.New(t)
836
837	// Start a client
838	c, cleanup := TestClient(t, nil)
839	defer cleanup()
840
841	// Make the request with bad allocation id
842	req := &cstructs.FsLogsRequest{
843		AllocID:      uuid.Generate(),
844		Task:         "foo",
845		LogType:      "stdout",
846		Origin:       "start",
847		QueryOptions: structs.QueryOptions{Region: "global"},
848	}
849
850	// Get the handler
851	handler, err := c.StreamingRpcHandler("FileSystem.Logs")
852	require.Nil(err)
853
854	// Create a pipe
855	p1, p2 := net.Pipe()
856	defer p1.Close()
857	defer p2.Close()
858
859	errCh := make(chan error)
860	streamMsg := make(chan *cstructs.StreamErrWrapper)
861
862	// Start the handler
863	go handler(p2)
864
865	// Start the decoder
866	go func() {
867		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
868		for {
869			var msg cstructs.StreamErrWrapper
870			if err := decoder.Decode(&msg); err != nil {
871				if err == io.EOF || strings.Contains(err.Error(), "closed") {
872					return
873				}
874				errCh <- fmt.Errorf("error decoding: %v", err)
875			}
876
877			streamMsg <- &msg
878		}
879	}()
880
881	// Send the request
882	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
883	require.Nil(encoder.Encode(req))
884
885	timeout := time.After(3 * time.Second)
886
887OUTER:
888	for {
889		select {
890		case <-timeout:
891			t.Fatal("timeout")
892		case err := <-errCh:
893			t.Fatal(err)
894		case msg := <-streamMsg:
895			t.Logf("Got msg %+v", msg)
896			if msg.Error == nil {
897				continue
898			}
899
900			if structs.IsErrUnknownAllocation(msg.Error) {
901				break OUTER
902			} else {
903				t.Fatalf("bad error: %v", err)
904			}
905		}
906	}
907}
908
909// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which
910// have not started returns a 404 error.
911func TestFS_Logs_TaskPending(t *testing.T) {
912	t.Parallel()
913	require := require.New(t)
914
915	// Start a server and client
916	s, cleanupS := nomad.TestServer(t, nil)
917	defer cleanupS()
918	testutil.WaitForLeader(t, s.RPC)
919
920	c, cleanupC := TestClient(t, func(c *config.Config) {
921		c.Servers = []string{s.GetConfig().RPCAddr.String()}
922	})
923	defer cleanupC()
924
925	job := mock.BatchJob()
926	job.TaskGroups[0].Count = 1
927	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
928		"start_block_for": "10s",
929	}
930
931	// Register job
932	args := &structs.JobRegisterRequest{}
933	args.Job = job
934	args.WriteRequest.Region = "global"
935	args.Namespace = job.Namespace
936	var jobResp structs.JobRegisterResponse
937	require.NoError(s.RPC("Job.Register", args, &jobResp))
938
939	// Get the allocation ID
940	var allocID string
941	testutil.WaitForResult(func() (bool, error) {
942		args := structs.AllocListRequest{}
943		args.Region = "global"
944		resp := structs.AllocListResponse{}
945		if err := s.RPC("Alloc.List", &args, &resp); err != nil {
946			return false, err
947		}
948
949		if len(resp.Allocations) != 1 {
950			return false, fmt.Errorf("expected 1 alloc, found %d", len(resp.Allocations))
951		}
952
953		allocID = resp.Allocations[0].ID
954
955		// wait for alloc runner to be created; otherwise, we get no alloc found error
956		if _, err := c.getAllocRunner(allocID); err != nil {
957			return false, fmt.Errorf("alloc runner was not created yet for %v", allocID)
958		}
959
960		return true, nil
961	}, func(err error) {
962		t.Fatalf("error getting alloc id: %v", err)
963	})
964
965	// Make the request
966	req := &cstructs.FsLogsRequest{
967		AllocID:      allocID,
968		Task:         job.TaskGroups[0].Tasks[0].Name,
969		LogType:      "stdout",
970		Origin:       "start",
971		PlainText:    true,
972		QueryOptions: structs.QueryOptions{Region: "global"},
973	}
974
975	// Get the handler
976	handler, err := c.StreamingRpcHandler("FileSystem.Logs")
977	require.Nil(err)
978
979	// Create a pipe
980	p1, p2 := net.Pipe()
981	defer p1.Close()
982	defer p2.Close()
983
984	errCh := make(chan error)
985	streamMsg := make(chan *cstructs.StreamErrWrapper)
986
987	// Start the handler
988	go handler(p2)
989
990	// Start the decoder
991	go func() {
992		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
993		for {
994			var msg cstructs.StreamErrWrapper
995			if err := decoder.Decode(&msg); err != nil {
996				if err == io.EOF || strings.Contains(err.Error(), "closed") {
997					return
998				}
999				errCh <- fmt.Errorf("error decoding: %v", err)
1000			}
1001
1002			streamMsg <- &msg
1003		}
1004	}()
1005
1006	// Send the request
1007	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1008	require.Nil(encoder.Encode(req))
1009
1010	for {
1011		select {
1012		case <-time.After(3 * time.Second):
1013			t.Fatal("timeout")
1014		case err := <-errCh:
1015			t.Fatalf("unexpected stream error: %v", err)
1016		case msg := <-streamMsg:
1017			require.NotNil(msg.Error)
1018			require.NotNil(msg.Error.Code)
1019			require.EqualValues(404, *msg.Error.Code)
1020			require.Contains(msg.Error.Message, "not started")
1021			return
1022		}
1023	}
1024}
1025
1026func TestFS_Logs_ACL(t *testing.T) {
1027	t.Parallel()
1028	require := require.New(t)
1029
1030	// Start a server
1031	s, root, cleanupS := nomad.TestACLServer(t, nil)
1032	defer cleanupS()
1033	testutil.WaitForLeader(t, s.RPC)
1034
1035	client, cleanup := TestClient(t, func(c *config.Config) {
1036		c.ACLEnabled = true
1037		c.Servers = []string{s.GetConfig().RPCAddr.String()}
1038	})
1039	defer cleanup()
1040
1041	// Create a bad token
1042	policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
1043	tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
1044
1045	policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "",
1046		[]string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS})
1047	tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
1048
1049	job := mock.BatchJob()
1050	job.TaskGroups[0].Count = 1
1051	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1052		"run_for": "20s",
1053	}
1054
1055	// Wait for client to be running job
1056	alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0]
1057
1058	cases := []struct {
1059		Name          string
1060		Token         string
1061		ExpectedError string
1062	}{
1063		{
1064			Name:          "bad token",
1065			Token:         tokenBad.SecretID,
1066			ExpectedError: structs.ErrPermissionDenied.Error(),
1067		},
1068		{
1069			Name:  "good token",
1070			Token: tokenGood.SecretID,
1071		},
1072		{
1073			Name:  "root token",
1074			Token: root.SecretID,
1075		},
1076	}
1077
1078	for _, c := range cases {
1079		t.Run(c.Name, func(t *testing.T) {
1080			// Make the request with bad allocation id
1081			req := &cstructs.FsLogsRequest{
1082				AllocID: alloc.ID,
1083				Task:    job.TaskGroups[0].Tasks[0].Name,
1084				LogType: "stdout",
1085				Origin:  "start",
1086				QueryOptions: structs.QueryOptions{
1087					Namespace: structs.DefaultNamespace,
1088					Region:    "global",
1089					AuthToken: c.Token,
1090				},
1091			}
1092
1093			// Get the handler
1094			handler, err := client.StreamingRpcHandler("FileSystem.Logs")
1095			require.Nil(err)
1096
1097			// Create a pipe
1098			p1, p2 := net.Pipe()
1099			defer p1.Close()
1100			defer p2.Close()
1101
1102			errCh := make(chan error)
1103			streamMsg := make(chan *cstructs.StreamErrWrapper)
1104
1105			// Start the handler
1106			go handler(p2)
1107
1108			// Start the decoder
1109			go func() {
1110				decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1111				for {
1112					var msg cstructs.StreamErrWrapper
1113					if err := decoder.Decode(&msg); err != nil {
1114						errCh <- err
1115						return
1116					}
1117
1118					streamMsg <- &msg
1119				}
1120			}()
1121
1122			// Send the request
1123			encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1124			require.Nil(encoder.Encode(req))
1125
1126			timeout := time.After(5 * time.Second)
1127
1128		OUTER:
1129			for {
1130				select {
1131				case <-timeout:
1132					t.Fatal("timeout")
1133				case err := <-errCh:
1134					eof := err == io.EOF || strings.Contains(err.Error(), "closed")
1135					if c.ExpectedError == "" && eof {
1136						// No error was expected!
1137						return
1138					}
1139					t.Fatal(err)
1140				case msg := <-streamMsg:
1141					if msg.Error == nil {
1142						continue
1143					}
1144
1145					if strings.Contains(msg.Error.Error(), c.ExpectedError) {
1146						// Ok! Error matched expectation.
1147						break OUTER
1148					} else {
1149						t.Fatalf("Bad error: %v", msg.Error)
1150					}
1151				}
1152			}
1153		})
1154	}
1155}
1156
1157func TestFS_Logs(t *testing.T) {
1158	t.Parallel()
1159	require := require.New(t)
1160
1161	// Start a server and client
1162	s, cleanupS := nomad.TestServer(t, nil)
1163	defer cleanupS()
1164	testutil.WaitForLeader(t, s.RPC)
1165
1166	c, cleanupC := TestClient(t, func(c *config.Config) {
1167		c.Servers = []string{s.GetConfig().RPCAddr.String()}
1168	})
1169	defer cleanupC()
1170
1171	expected := "Hello from the other side\n"
1172	job := mock.BatchJob()
1173	job.TaskGroups[0].Count = 1
1174	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1175		"run_for":       "2s",
1176		"stdout_string": expected,
1177	}
1178
1179	// Wait for client to be running job
1180	testutil.WaitForRunning(t, s.RPC, job)
1181
1182	// Get the allocation ID
1183	args := structs.AllocListRequest{}
1184	args.Region = "global"
1185	resp := structs.AllocListResponse{}
1186	require.NoError(s.RPC("Alloc.List", &args, &resp))
1187	require.Len(resp.Allocations, 1)
1188	allocID := resp.Allocations[0].ID
1189
1190	// Make the request
1191	req := &cstructs.FsLogsRequest{
1192		AllocID:      allocID,
1193		Task:         job.TaskGroups[0].Tasks[0].Name,
1194		LogType:      "stdout",
1195		Origin:       "start",
1196		PlainText:    true,
1197		QueryOptions: structs.QueryOptions{Region: "global"},
1198	}
1199
1200	// Get the handler
1201	handler, err := c.StreamingRpcHandler("FileSystem.Logs")
1202	require.Nil(err)
1203
1204	// Create a pipe
1205	p1, p2 := net.Pipe()
1206	defer p1.Close()
1207	defer p2.Close()
1208
1209	errCh := make(chan error)
1210	streamMsg := make(chan *cstructs.StreamErrWrapper)
1211
1212	// Start the handler
1213	go handler(p2)
1214
1215	// Start the decoder
1216	go func() {
1217		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1218		for {
1219			var msg cstructs.StreamErrWrapper
1220			if err := decoder.Decode(&msg); err != nil {
1221				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1222					return
1223				}
1224				errCh <- fmt.Errorf("error decoding: %v", err)
1225			}
1226
1227			streamMsg <- &msg
1228		}
1229	}()
1230
1231	// Send the request
1232	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1233	require.Nil(encoder.Encode(req))
1234
1235	timeout := time.After(3 * time.Second)
1236	received := ""
1237OUTER:
1238	for {
1239		select {
1240		case <-timeout:
1241			t.Fatal("timeout")
1242		case err := <-errCh:
1243			t.Fatal(err)
1244		case msg := <-streamMsg:
1245			if msg.Error != nil {
1246				t.Fatalf("Got error: %v", msg.Error.Error())
1247			}
1248
1249			// Add the payload
1250			received += string(msg.Payload)
1251			if received == expected {
1252				break OUTER
1253			}
1254		}
1255	}
1256}
1257
1258func TestFS_Logs_Follow(t *testing.T) {
1259	t.Parallel()
1260	require := require.New(t)
1261
1262	// Start a server and client
1263	s, cleanupS := nomad.TestServer(t, nil)
1264	defer cleanupS()
1265	testutil.WaitForLeader(t, s.RPC)
1266
1267	c, cleanupC := TestClient(t, func(c *config.Config) {
1268		c.Servers = []string{s.GetConfig().RPCAddr.String()}
1269	})
1270	defer cleanupC()
1271
1272	expectedBase := "Hello from the other side\n"
1273	repeat := 10
1274
1275	job := mock.BatchJob()
1276	job.TaskGroups[0].Count = 1
1277	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1278		"run_for":                "20s",
1279		"stdout_string":          expectedBase,
1280		"stdout_repeat":          repeat,
1281		"stdout_repeat_duration": "200ms",
1282	}
1283
1284	// Wait for client to be running job
1285	alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
1286
1287	// Make the request
1288	req := &cstructs.FsLogsRequest{
1289		AllocID:      alloc.ID,
1290		Task:         job.TaskGroups[0].Tasks[0].Name,
1291		LogType:      "stdout",
1292		Origin:       "start",
1293		PlainText:    true,
1294		Follow:       true,
1295		QueryOptions: structs.QueryOptions{Region: "global"},
1296	}
1297
1298	// Get the handler
1299	handler, err := c.StreamingRpcHandler("FileSystem.Logs")
1300	require.NoError(err)
1301
1302	// Create a pipe
1303	p1, p2 := net.Pipe()
1304	defer p1.Close()
1305	defer p2.Close()
1306
1307	errCh := make(chan error)
1308	streamMsg := make(chan *cstructs.StreamErrWrapper)
1309
1310	// Start the handler
1311	go handler(p2)
1312
1313	// Start the decoder
1314	go func() {
1315		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
1316		for {
1317			var msg cstructs.StreamErrWrapper
1318			if err := decoder.Decode(&msg); err != nil {
1319				if err == io.EOF || strings.Contains(err.Error(), "closed") {
1320					return
1321				}
1322				errCh <- fmt.Errorf("error decoding: %v", err)
1323			}
1324
1325			streamMsg <- &msg
1326		}
1327	}()
1328
1329	// Send the request
1330	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
1331	require.Nil(encoder.Encode(req))
1332
1333	timeout := time.After(20 * time.Second)
1334	expected := strings.Repeat(expectedBase, repeat+1)
1335	received := ""
1336OUTER:
1337	for {
1338		select {
1339		case <-timeout:
1340			t.Fatal("timeout")
1341		case err := <-errCh:
1342			t.Fatal(err)
1343		case msg := <-streamMsg:
1344			if msg.Error != nil {
1345				t.Fatalf("Got error: %v", msg.Error.Error())
1346			}
1347
1348			// Add the payload
1349			received += string(msg.Payload)
1350			if received == expected {
1351				break OUTER
1352			}
1353		}
1354	}
1355}
1356
1357func TestFS_findClosest(t *testing.T) {
1358	task := "foo"
1359	entries := []*cstructs.AllocFileInfo{
1360		{
1361			Name: "foo.stdout.0",
1362			Size: 100,
1363		},
1364		{
1365			Name: "foo.stdout.1",
1366			Size: 100,
1367		},
1368		{
1369			Name: "foo.stdout.2",
1370			Size: 100,
1371		},
1372		{
1373			Name: "foo.stdout.3",
1374			Size: 100,
1375		},
1376		{
1377			Name: "foo.stderr.0",
1378			Size: 100,
1379		},
1380		{
1381			Name: "foo.stderr.1",
1382			Size: 100,
1383		},
1384		{
1385			Name: "foo.stderr.2",
1386			Size: 100,
1387		},
1388	}
1389
1390	cases := []struct {
1391		Entries        []*cstructs.AllocFileInfo
1392		DesiredIdx     int64
1393		DesiredOffset  int64
1394		Task           string
1395		LogType        string
1396		ExpectedFile   string
1397		ExpectedIdx    int64
1398		ExpectedOffset int64
1399		Error          bool
1400	}{
1401		// Test error cases
1402		{
1403			Entries:    nil,
1404			DesiredIdx: 0,
1405			Task:       task,
1406			LogType:    "stdout",
1407			Error:      true,
1408		},
1409		{
1410			Entries:    entries[0:3],
1411			DesiredIdx: 0,
1412			Task:       task,
1413			LogType:    "stderr",
1414			Error:      true,
1415		},
1416
1417		// Test beginning cases
1418		{
1419			Entries:      entries,
1420			DesiredIdx:   0,
1421			Task:         task,
1422			LogType:      "stdout",
1423			ExpectedFile: entries[0].Name,
1424			ExpectedIdx:  0,
1425		},
1426		{
1427			// Desired offset should be ignored at edges
1428			Entries:        entries,
1429			DesiredIdx:     0,
1430			DesiredOffset:  -100,
1431			Task:           task,
1432			LogType:        "stdout",
1433			ExpectedFile:   entries[0].Name,
1434			ExpectedIdx:    0,
1435			ExpectedOffset: 0,
1436		},
1437		{
1438			// Desired offset should be ignored at edges
1439			Entries:        entries,
1440			DesiredIdx:     1,
1441			DesiredOffset:  -1000,
1442			Task:           task,
1443			LogType:        "stdout",
1444			ExpectedFile:   entries[0].Name,
1445			ExpectedIdx:    0,
1446			ExpectedOffset: 0,
1447		},
1448		{
1449			Entries:      entries,
1450			DesiredIdx:   0,
1451			Task:         task,
1452			LogType:      "stderr",
1453			ExpectedFile: entries[4].Name,
1454			ExpectedIdx:  0,
1455		},
1456		{
1457			Entries:      entries,
1458			DesiredIdx:   0,
1459			Task:         task,
1460			LogType:      "stdout",
1461			ExpectedFile: entries[0].Name,
1462			ExpectedIdx:  0,
1463		},
1464
1465		// Test middle cases
1466		{
1467			Entries:      entries,
1468			DesiredIdx:   1,
1469			Task:         task,
1470			LogType:      "stdout",
1471			ExpectedFile: entries[1].Name,
1472			ExpectedIdx:  1,
1473		},
1474		{
1475			Entries:        entries,
1476			DesiredIdx:     1,
1477			DesiredOffset:  10,
1478			Task:           task,
1479			LogType:        "stdout",
1480			ExpectedFile:   entries[1].Name,
1481			ExpectedIdx:    1,
1482			ExpectedOffset: 10,
1483		},
1484		{
1485			Entries:        entries,
1486			DesiredIdx:     1,
1487			DesiredOffset:  110,
1488			Task:           task,
1489			LogType:        "stdout",
1490			ExpectedFile:   entries[2].Name,
1491			ExpectedIdx:    2,
1492			ExpectedOffset: 10,
1493		},
1494		{
1495			Entries:      entries,
1496			DesiredIdx:   1,
1497			Task:         task,
1498			LogType:      "stderr",
1499			ExpectedFile: entries[5].Name,
1500			ExpectedIdx:  1,
1501		},
1502		// Test end cases
1503		{
1504			Entries:      entries,
1505			DesiredIdx:   math.MaxInt64,
1506			Task:         task,
1507			LogType:      "stdout",
1508			ExpectedFile: entries[3].Name,
1509			ExpectedIdx:  3,
1510		},
1511		{
1512			Entries:        entries,
1513			DesiredIdx:     math.MaxInt64,
1514			DesiredOffset:  math.MaxInt64,
1515			Task:           task,
1516			LogType:        "stdout",
1517			ExpectedFile:   entries[3].Name,
1518			ExpectedIdx:    3,
1519			ExpectedOffset: 100,
1520		},
1521		{
1522			Entries:        entries,
1523			DesiredIdx:     math.MaxInt64,
1524			DesiredOffset:  -10,
1525			Task:           task,
1526			LogType:        "stdout",
1527			ExpectedFile:   entries[3].Name,
1528			ExpectedIdx:    3,
1529			ExpectedOffset: 90,
1530		},
1531		{
1532			Entries:      entries,
1533			DesiredIdx:   math.MaxInt64,
1534			Task:         task,
1535			LogType:      "stderr",
1536			ExpectedFile: entries[6].Name,
1537			ExpectedIdx:  2,
1538		},
1539	}
1540
1541	for i, c := range cases {
1542		entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType)
1543		if err != nil {
1544			if !c.Error {
1545				t.Fatalf("case %d: Unexpected error: %v", i, err)
1546			}
1547			continue
1548		}
1549
1550		if entry.Name != c.ExpectedFile {
1551			t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile)
1552		}
1553		if idx != c.ExpectedIdx {
1554			t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx)
1555		}
1556		if offset != c.ExpectedOffset {
1557			t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset)
1558		}
1559	}
1560}
1561
1562func TestFS_streamFile_NoFile(t *testing.T) {
1563	t.Parallel()
1564	require := require.New(t)
1565	c, cleanup := TestClient(t, nil)
1566	defer cleanup()
1567
1568	ad := tempAllocDir(t)
1569	defer os.RemoveAll(ad.AllocDir)
1570
1571	frames := make(chan *sframer.StreamFrame, 32)
1572	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
1573	framer.Run()
1574	defer framer.Destroy()
1575
1576	err := c.endpoints.FileSystem.streamFile(
1577		context.Background(), 0, "foo", 0, ad, framer, nil)
1578	require.NotNil(err)
1579	if runtime.GOOS == "windows" {
1580		require.Contains(err.Error(), "cannot find the file")
1581	} else {
1582		require.Contains(err.Error(), "no such file")
1583	}
1584}
1585
1586func TestFS_streamFile_Modify(t *testing.T) {
1587	t.Parallel()
1588
1589	c, cleanup := TestClient(t, nil)
1590	defer cleanup()
1591
1592	// Get a temp alloc dir
1593	ad := tempAllocDir(t)
1594	defer os.RemoveAll(ad.AllocDir)
1595
1596	// Create a file in the temp dir
1597	streamFile := "stream_file"
1598	f, err := os.Create(filepath.Join(ad.AllocDir, streamFile))
1599	if err != nil {
1600		t.Fatalf("Failed to create file: %v", err)
1601	}
1602	defer f.Close()
1603
1604	data := []byte("helloworld")
1605
1606	// Start the reader
1607	resultCh := make(chan struct{})
1608	frames := make(chan *sframer.StreamFrame, 4)
1609	go func() {
1610		var collected []byte
1611		for {
1612			frame := <-frames
1613			if frame.IsHeartbeat() {
1614				continue
1615			}
1616
1617			collected = append(collected, frame.Data...)
1618			if reflect.DeepEqual(data, collected) {
1619				resultCh <- struct{}{}
1620				return
1621			}
1622		}
1623	}()
1624
1625	// Write a few bytes
1626	if _, err := f.Write(data[:3]); err != nil {
1627		t.Fatalf("write failed: %v", err)
1628	}
1629
1630	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
1631	framer.Run()
1632	defer framer.Destroy()
1633
1634	// Start streaming
1635	go func() {
1636		if err := c.endpoints.FileSystem.streamFile(
1637			context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
1638			t.Fatalf("stream() failed: %v", err)
1639		}
1640	}()
1641
1642	// Sleep a little before writing more. This lets us check if the watch
1643	// is working.
1644	time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
1645	if _, err := f.Write(data[3:]); err != nil {
1646		t.Fatalf("write failed: %v", err)
1647	}
1648
1649	select {
1650	case <-resultCh:
1651	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1652		t.Fatalf("failed to send new data")
1653	}
1654}
1655
1656func TestFS_streamFile_Truncate(t *testing.T) {
1657	t.Parallel()
1658	c, cleanup := TestClient(t, nil)
1659	defer cleanup()
1660
1661	// Get a temp alloc dir
1662	ad := tempAllocDir(t)
1663	defer os.RemoveAll(ad.AllocDir)
1664
1665	// Create a file in the temp dir
1666	data := []byte("helloworld")
1667	streamFile := "stream_file"
1668	streamFilePath := filepath.Join(ad.AllocDir, streamFile)
1669	f, err := os.Create(streamFilePath)
1670	if err != nil {
1671		t.Fatalf("Failed to create file: %v", err)
1672	}
1673	defer f.Close()
1674
1675	// Start the reader
1676	truncateCh := make(chan struct{})
1677	truncateClosed := false
1678	dataPostTruncCh := make(chan struct{})
1679	frames := make(chan *sframer.StreamFrame, 4)
1680	go func() {
1681		var collected []byte
1682		for {
1683			frame := <-frames
1684			if frame.IsHeartbeat() {
1685				continue
1686			}
1687
1688			if frame.FileEvent == truncateEvent && !truncateClosed {
1689				close(truncateCh)
1690				truncateClosed = true
1691			}
1692
1693			collected = append(collected, frame.Data...)
1694			if reflect.DeepEqual(data, collected) {
1695				close(dataPostTruncCh)
1696				return
1697			}
1698		}
1699	}()
1700
1701	// Write a few bytes
1702	if _, err := f.Write(data[:3]); err != nil {
1703		t.Fatalf("write failed: %v", err)
1704	}
1705
1706	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
1707	framer.Run()
1708	defer framer.Destroy()
1709
1710	// Start streaming
1711	go func() {
1712		if err := c.endpoints.FileSystem.streamFile(
1713			context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
1714			t.Fatalf("stream() failed: %v", err)
1715		}
1716	}()
1717
1718	// Sleep a little before truncating. This lets us check if the watch
1719	// is working.
1720	time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
1721	if err := f.Truncate(0); err != nil {
1722		t.Fatalf("truncate failed: %v", err)
1723	}
1724	if err := f.Sync(); err != nil {
1725		t.Fatalf("sync failed: %v", err)
1726	}
1727	if err := f.Close(); err != nil {
1728		t.Fatalf("failed to close file: %v", err)
1729	}
1730
1731	f2, err := os.OpenFile(streamFilePath, os.O_RDWR, 0)
1732	if err != nil {
1733		t.Fatalf("failed to reopen file: %v", err)
1734	}
1735	defer f2.Close()
1736	if _, err := f2.Write(data[3:5]); err != nil {
1737		t.Fatalf("write failed: %v", err)
1738	}
1739
1740	select {
1741	case <-truncateCh:
1742	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1743		t.Fatalf("did not receive truncate")
1744	}
1745
1746	// Sleep a little before writing more. This lets us check if the watch
1747	// is working.
1748	time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
1749	if _, err := f2.Write(data[5:]); err != nil {
1750		t.Fatalf("write failed: %v", err)
1751	}
1752
1753	select {
1754	case <-dataPostTruncCh:
1755	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1756		t.Fatalf("did not receive post truncate data")
1757	}
1758}
1759
1760func TestFS_streamImpl_Delete(t *testing.T) {
1761	if runtime.GOOS == "windows" {
1762		t.Skip("Windows does not allow us to delete a file while it is open")
1763	}
1764	t.Parallel()
1765
1766	c, cleanup := TestClient(t, nil)
1767	defer cleanup()
1768
1769	// Get a temp alloc dir
1770	ad := tempAllocDir(t)
1771	defer os.RemoveAll(ad.AllocDir)
1772
1773	// Create a file in the temp dir
1774	data := []byte("helloworld")
1775	streamFile := "stream_file"
1776	streamFilePath := filepath.Join(ad.AllocDir, streamFile)
1777	f, err := os.Create(streamFilePath)
1778	if err != nil {
1779		t.Fatalf("Failed to create file: %v", err)
1780	}
1781	defer f.Close()
1782
1783	// Start the reader
1784	deleteCh := make(chan struct{})
1785	frames := make(chan *sframer.StreamFrame, 4)
1786	go func() {
1787		for {
1788			frame, ok := <-frames
1789			if !ok {
1790				return
1791			}
1792
1793			if frame.IsHeartbeat() {
1794				continue
1795			}
1796
1797			if frame.FileEvent == deleteEvent {
1798				close(deleteCh)
1799				return
1800			}
1801		}
1802	}()
1803
1804	// Write a few bytes
1805	if _, err := f.Write(data[:3]); err != nil {
1806		t.Fatalf("write failed: %v", err)
1807	}
1808
1809	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
1810	framer.Run()
1811	defer framer.Destroy()
1812
1813	// Start streaming
1814	go func() {
1815		if err := c.endpoints.FileSystem.streamFile(
1816			context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
1817			t.Fatalf("stream() failed: %v", err)
1818		}
1819	}()
1820
1821	// Sleep a little before deleting. This lets us check if the watch
1822	// is working.
1823	time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second)
1824	if err := os.Remove(streamFilePath); err != nil {
1825		t.Fatalf("delete failed: %v", err)
1826	}
1827
1828	select {
1829	case <-deleteCh:
1830	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1831		t.Fatalf("did not receive delete")
1832	}
1833}
1834
1835func TestFS_logsImpl_NoFollow(t *testing.T) {
1836	t.Parallel()
1837
1838	c, cleanup := TestClient(t, nil)
1839	defer cleanup()
1840
1841	// Get a temp alloc dir and create the log dir
1842	ad := tempAllocDir(t)
1843	defer os.RemoveAll(ad.AllocDir)
1844
1845	logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
1846	if err := os.MkdirAll(logDir, 0777); err != nil {
1847		t.Fatalf("Failed to make log dir: %v", err)
1848	}
1849
1850	// Create a series of log files in the temp dir
1851	task := "foo"
1852	logType := "stdout"
1853	expected := []byte("012")
1854	for i := 0; i < 3; i++ {
1855		logFile := fmt.Sprintf("%s.%s.%d", task, logType, i)
1856		logFilePath := filepath.Join(logDir, logFile)
1857		err := ioutil.WriteFile(logFilePath, expected[i:i+1], 0777)
1858		if err != nil {
1859			t.Fatalf("Failed to create file: %v", err)
1860		}
1861	}
1862
1863	// Start the reader
1864	resultCh := make(chan struct{})
1865	frames := make(chan *sframer.StreamFrame, 4)
1866	var received []byte
1867	go func() {
1868		for {
1869			frame, ok := <-frames
1870			if !ok {
1871				return
1872			}
1873
1874			if frame.IsHeartbeat() {
1875				continue
1876			}
1877
1878			received = append(received, frame.Data...)
1879			if reflect.DeepEqual(received, expected) {
1880				close(resultCh)
1881				return
1882			}
1883		}
1884	}()
1885
1886	// Start streaming logs
1887	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1888	defer cancel()
1889
1890	if err := c.endpoints.FileSystem.logsImpl(
1891		ctx, false, false, 0,
1892		OriginStart, task, logType, ad, frames); err != nil {
1893		t.Fatalf("logsImpl failed: %v", err)
1894	}
1895
1896	select {
1897	case <-resultCh:
1898	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1899		t.Fatalf("did not receive data: got %q", string(received))
1900	}
1901}
1902
1903func TestFS_logsImpl_Follow(t *testing.T) {
1904	t.Parallel()
1905
1906	c, cleanup := TestClient(t, nil)
1907	defer cleanup()
1908
1909	// Get a temp alloc dir and create the log dir
1910	ad := tempAllocDir(t)
1911	defer os.RemoveAll(ad.AllocDir)
1912
1913	logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName)
1914	if err := os.MkdirAll(logDir, 0777); err != nil {
1915		t.Fatalf("Failed to make log dir: %v", err)
1916	}
1917
1918	// Create a series of log files in the temp dir
1919	task := "foo"
1920	logType := "stdout"
1921	expected := []byte("012345")
1922	initialWrites := 3
1923
1924	writeToFile := func(index int, data []byte) {
1925		logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
1926		logFilePath := filepath.Join(logDir, logFile)
1927		err := ioutil.WriteFile(logFilePath, data, 0777)
1928		if err != nil {
1929			t.Fatalf("Failed to create file: %v", err)
1930		}
1931	}
1932	for i := 0; i < initialWrites; i++ {
1933		writeToFile(i, expected[i:i+1])
1934	}
1935
1936	// Start the reader
1937	firstResultCh := make(chan struct{})
1938	fullResultCh := make(chan struct{})
1939	frames := make(chan *sframer.StreamFrame, 4)
1940	var received []byte
1941	go func() {
1942		for {
1943			frame, ok := <-frames
1944			if !ok {
1945				return
1946			}
1947
1948			if frame.IsHeartbeat() {
1949				continue
1950			}
1951
1952			received = append(received, frame.Data...)
1953			if reflect.DeepEqual(received, expected[:initialWrites]) {
1954				close(firstResultCh)
1955			} else if reflect.DeepEqual(received, expected) {
1956				close(fullResultCh)
1957				return
1958			}
1959		}
1960	}()
1961
1962	// Start streaming logs
1963	go c.endpoints.FileSystem.logsImpl(
1964		context.Background(), true, false, 0,
1965		OriginStart, task, logType, ad, frames)
1966
1967	select {
1968	case <-firstResultCh:
1969	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1970		t.Fatalf("did not receive data: got %q", string(received))
1971	}
1972
1973	// We got the first chunk of data, write out the rest to the next file
1974	// at an index much ahead to check that it is following and detecting
1975	// skips
1976	skipTo := initialWrites + 10
1977	writeToFile(skipTo, expected[initialWrites:])
1978
1979	select {
1980	case <-fullResultCh:
1981	case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow):
1982		t.Fatalf("did not receive data: got %q", string(received))
1983	}
1984}
1985