1package client_test 2 3import ( 4 "bytes" 5 "fmt" 6 "io/ioutil" 7 "path/filepath" 8 "testing" 9 10 "github.com/hashicorp/nomad/command/agent" 11 "github.com/hashicorp/nomad/nomad" 12 "github.com/hashicorp/nomad/nomad/mock" 13 "github.com/hashicorp/nomad/nomad/structs" 14 "github.com/hashicorp/nomad/nomad/structs/config" 15 "github.com/hashicorp/nomad/testutil" 16 "github.com/stretchr/testify/require" 17) 18 19// TestPrevAlloc_StreamAllocDir_TLS asserts ephemeral disk migrations still 20// work when TLS is enabled. 21func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) { 22 const ( 23 caFn = "../helper/tlsutil/testdata/global-ca.pem" 24 serverCertFn = "../helper/tlsutil/testdata/global-server.pem" 25 serverKeyFn = "../helper/tlsutil/testdata/global-server-key.pem" 26 clientCertFn = "../helper/tlsutil/testdata/global-client.pem" 27 clientKeyFn = "../helper/tlsutil/testdata/global-client-key.pem" 28 ) 29 t.Parallel() 30 require := require.New(t) 31 32 server, cleanupS := nomad.TestServer(t, func(c *nomad.Config) { 33 c.TLSConfig = &config.TLSConfig{ 34 EnableHTTP: true, 35 EnableRPC: true, 36 VerifyServerHostname: true, 37 CAFile: caFn, 38 CertFile: serverCertFn, 39 KeyFile: serverKeyFn, 40 } 41 }) 42 defer cleanupS() 43 testutil.WaitForLeader(t, server.RPC) 44 45 t.Logf("[TEST] Leader started: %s", server.GetConfig().RPCAddr.String()) 46 47 agentConfFunc := func(c *agent.Config) { 48 c.Region = "global" 49 c.TLSConfig = &config.TLSConfig{ 50 EnableHTTP: true, 51 EnableRPC: true, 52 VerifyServerHostname: true, 53 CAFile: caFn, 54 CertFile: clientCertFn, 55 KeyFile: clientKeyFn, 56 } 57 c.Client.Enabled = true 58 c.Client.Servers = []string{server.GetConfig().RPCAddr.String()} 59 } 60 client1 := agent.NewTestAgent(t, "client1", agentConfFunc) 61 defer client1.Shutdown() 62 63 client2 := agent.NewTestAgent(t, "client2", agentConfFunc) 64 defer client2.Shutdown() 65 66 job := mock.Job() 67 job.Constraints = []*structs.Constraint{ 68 { 69 LTarget: "${node.unique.name}", 70 RTarget: "client1", 71 Operand: "=", 72 }, 73 } 74 job.TaskGroups[0].Count = 1 75 job.TaskGroups[0].EphemeralDisk.Sticky = true 76 job.TaskGroups[0].EphemeralDisk.Migrate = true 77 job.TaskGroups[0].Tasks[0] = &structs.Task{ 78 Name: "migrate_tls", 79 Driver: "mock_driver", 80 Config: map[string]interface{}{ 81 "run_for": "1m", 82 }, 83 LogConfig: structs.DefaultLogConfig(), 84 Resources: &structs.Resources{ 85 CPU: 50, 86 MemoryMB: 25, 87 }, 88 } 89 testutil.WaitForRunning(t, server.RPC, job.Copy()) 90 91 allocArgs := &structs.JobSpecificRequest{} 92 allocArgs.JobID = job.ID 93 allocArgs.QueryOptions.Region = "global" 94 var allocReply structs.JobAllocationsResponse 95 require.NoError(server.RPC("Job.Allocations", allocArgs, &allocReply)) 96 require.Len(allocReply.Allocations, 1) 97 origAlloc := allocReply.Allocations[0].ID 98 99 // Save a file into alloc dir 100 contents := []byte("123\n456") 101 allocFn := filepath.Join(client1.DataDir, "alloc", origAlloc, "alloc", "data", "bar") 102 require.NoError(ioutil.WriteFile(allocFn, contents, 0666)) 103 t.Logf("[TEST] Wrote initial file: %s", allocFn) 104 105 // Migrate alloc to other node 106 job.Constraints[0].RTarget = "client2" 107 108 // Only register job - don't wait for running - since previous completed allocs 109 // will interfere 110 testutil.RegisterJob(t, server.RPC, job.Copy()) 111 112 // Wait for new alloc to be running 113 var newAlloc *structs.AllocListStub 114 testutil.WaitForResult(func() (bool, error) { 115 allocArgs := &structs.JobSpecificRequest{} 116 allocArgs.JobID = job.ID 117 allocArgs.QueryOptions.Region = "global" 118 var allocReply structs.JobAllocationsResponse 119 require.NoError(server.RPC("Job.Allocations", allocArgs, &allocReply)) 120 if n := len(allocReply.Allocations); n != 2 { 121 return false, fmt.Errorf("expected 2 allocs found %d", n) 122 } 123 124 // Pick the one that didn't exist before 125 if allocReply.Allocations[0].ID == origAlloc { 126 newAlloc = allocReply.Allocations[1] 127 } else { 128 newAlloc = allocReply.Allocations[0] 129 } 130 131 return newAlloc.ClientStatus != structs.AllocClientStatusRunning, 132 fmt.Errorf("client status: %v", newAlloc.ClientStatus) 133 }, func(err error) { 134 t.Fatalf("new alloc not running: %v", err) 135 }) 136 137 // Wait for file to appear on other client 138 allocFn2 := filepath.Join(client2.DataDir, "alloc", newAlloc.ID, "alloc", "data", "bar") 139 t.Logf("[TEST] Comparing against file: %s", allocFn2) 140 testutil.WaitForResult(func() (bool, error) { 141 found, err := ioutil.ReadFile(allocFn2) 142 if err != nil { 143 return false, err 144 } 145 return bytes.Equal(contents, found), fmt.Errorf("contents misatch. expected:\n%s\n\nfound:\n%s\n", 146 contents, found) 147 }, func(err error) { 148 t.Fatalf("file didn't migrate: %v", err) 149 }) 150} 151