1package client_test
2
3import (
4	"bytes"
5	"fmt"
6	"io/ioutil"
7	"path/filepath"
8	"testing"
9
10	"github.com/hashicorp/nomad/command/agent"
11	"github.com/hashicorp/nomad/nomad"
12	"github.com/hashicorp/nomad/nomad/mock"
13	"github.com/hashicorp/nomad/nomad/structs"
14	"github.com/hashicorp/nomad/nomad/structs/config"
15	"github.com/hashicorp/nomad/testutil"
16	"github.com/stretchr/testify/require"
17)
18
19// TestPrevAlloc_StreamAllocDir_TLS asserts ephemeral disk migrations still
20// work when TLS is enabled.
21func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) {
22	const (
23		caFn         = "../helper/tlsutil/testdata/global-ca.pem"
24		serverCertFn = "../helper/tlsutil/testdata/global-server.pem"
25		serverKeyFn  = "../helper/tlsutil/testdata/global-server-key.pem"
26		clientCertFn = "../helper/tlsutil/testdata/global-client.pem"
27		clientKeyFn  = "../helper/tlsutil/testdata/global-client-key.pem"
28	)
29	t.Parallel()
30	require := require.New(t)
31
32	server, cleanupS := nomad.TestServer(t, func(c *nomad.Config) {
33		c.TLSConfig = &config.TLSConfig{
34			EnableHTTP:           true,
35			EnableRPC:            true,
36			VerifyServerHostname: true,
37			CAFile:               caFn,
38			CertFile:             serverCertFn,
39			KeyFile:              serverKeyFn,
40		}
41	})
42	defer cleanupS()
43	testutil.WaitForLeader(t, server.RPC)
44
45	t.Logf("[TEST] Leader started: %s", server.GetConfig().RPCAddr.String())
46
47	agentConfFunc := func(c *agent.Config) {
48		c.Region = "global"
49		c.TLSConfig = &config.TLSConfig{
50			EnableHTTP:           true,
51			EnableRPC:            true,
52			VerifyServerHostname: true,
53			CAFile:               caFn,
54			CertFile:             clientCertFn,
55			KeyFile:              clientKeyFn,
56		}
57		c.Client.Enabled = true
58		c.Client.Servers = []string{server.GetConfig().RPCAddr.String()}
59	}
60	client1 := agent.NewTestAgent(t, "client1", agentConfFunc)
61	defer client1.Shutdown()
62
63	client2 := agent.NewTestAgent(t, "client2", agentConfFunc)
64	defer client2.Shutdown()
65
66	job := mock.Job()
67	job.Constraints = []*structs.Constraint{
68		{
69			LTarget: "${node.unique.name}",
70			RTarget: "client1",
71			Operand: "=",
72		},
73	}
74	job.TaskGroups[0].Count = 1
75	job.TaskGroups[0].EphemeralDisk.Sticky = true
76	job.TaskGroups[0].EphemeralDisk.Migrate = true
77	job.TaskGroups[0].Tasks[0] = &structs.Task{
78		Name:   "migrate_tls",
79		Driver: "mock_driver",
80		Config: map[string]interface{}{
81			"run_for": "1m",
82		},
83		LogConfig: structs.DefaultLogConfig(),
84		Resources: &structs.Resources{
85			CPU:      50,
86			MemoryMB: 25,
87		},
88	}
89	testutil.WaitForRunning(t, server.RPC, job.Copy())
90
91	allocArgs := &structs.JobSpecificRequest{}
92	allocArgs.JobID = job.ID
93	allocArgs.QueryOptions.Region = "global"
94	var allocReply structs.JobAllocationsResponse
95	require.NoError(server.RPC("Job.Allocations", allocArgs, &allocReply))
96	require.Len(allocReply.Allocations, 1)
97	origAlloc := allocReply.Allocations[0].ID
98
99	// Save a file into alloc dir
100	contents := []byte("123\n456")
101	allocFn := filepath.Join(client1.DataDir, "alloc", origAlloc, "alloc", "data", "bar")
102	require.NoError(ioutil.WriteFile(allocFn, contents, 0666))
103	t.Logf("[TEST] Wrote initial file: %s", allocFn)
104
105	// Migrate alloc to other node
106	job.Constraints[0].RTarget = "client2"
107
108	// Only register job - don't wait for running - since previous completed allocs
109	// will interfere
110	testutil.RegisterJob(t, server.RPC, job.Copy())
111
112	// Wait for new alloc to be running
113	var newAlloc *structs.AllocListStub
114	testutil.WaitForResult(func() (bool, error) {
115		allocArgs := &structs.JobSpecificRequest{}
116		allocArgs.JobID = job.ID
117		allocArgs.QueryOptions.Region = "global"
118		var allocReply structs.JobAllocationsResponse
119		require.NoError(server.RPC("Job.Allocations", allocArgs, &allocReply))
120		if n := len(allocReply.Allocations); n != 2 {
121			return false, fmt.Errorf("expected 2 allocs found %d", n)
122		}
123
124		// Pick the one that didn't exist before
125		if allocReply.Allocations[0].ID == origAlloc {
126			newAlloc = allocReply.Allocations[1]
127		} else {
128			newAlloc = allocReply.Allocations[0]
129		}
130
131		return newAlloc.ClientStatus != structs.AllocClientStatusRunning,
132			fmt.Errorf("client status: %v", newAlloc.ClientStatus)
133	}, func(err error) {
134		t.Fatalf("new alloc not running: %v", err)
135	})
136
137	// Wait for file to appear on other client
138	allocFn2 := filepath.Join(client2.DataDir, "alloc", newAlloc.ID, "alloc", "data", "bar")
139	t.Logf("[TEST] Comparing against file: %s", allocFn2)
140	testutil.WaitForResult(func() (bool, error) {
141		found, err := ioutil.ReadFile(allocFn2)
142		if err != nil {
143			return false, err
144		}
145		return bytes.Equal(contents, found), fmt.Errorf("contents misatch. expected:\n%s\n\nfound:\n%s\n",
146			contents, found)
147	}, func(err error) {
148		t.Fatalf("file didn't migrate: %v", err)
149	})
150}
151