1package testutils 2 3import ( 4 "context" 5 "fmt" 6 "io/ioutil" 7 "path/filepath" 8 "runtime" 9 "strings" 10 "time" 11 12 testing "github.com/mitchellh/go-testing-interface" 13 14 hclog "github.com/hashicorp/go-hclog" 15 plugin "github.com/hashicorp/go-plugin" 16 "github.com/hashicorp/nomad/client/allocdir" 17 "github.com/hashicorp/nomad/client/config" 18 "github.com/hashicorp/nomad/client/logmon" 19 "github.com/hashicorp/nomad/client/taskenv" 20 "github.com/hashicorp/nomad/helper/testlog" 21 "github.com/hashicorp/nomad/helper/uuid" 22 "github.com/hashicorp/nomad/nomad/mock" 23 "github.com/hashicorp/nomad/nomad/structs" 24 "github.com/hashicorp/nomad/plugins/base" 25 "github.com/hashicorp/nomad/plugins/drivers" 26 "github.com/hashicorp/nomad/plugins/shared/hclspec" 27 "github.com/stretchr/testify/require" 28) 29 30type DriverHarness struct { 31 drivers.DriverPlugin 32 client *plugin.GRPCClient 33 server *plugin.GRPCServer 34 t testing.T 35 logger hclog.Logger 36 impl drivers.DriverPlugin 37} 38 39func (d *DriverHarness) Impl() drivers.DriverPlugin { 40 return d.impl 41} 42func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness { 43 logger := testlog.HCLogger(t).Named("driver_harness") 44 45 pd := drivers.NewDriverPlugin(d, logger) 46 47 client, server := plugin.TestPluginGRPCConn(t, 48 map[string]plugin.Plugin{ 49 base.PluginTypeDriver: pd, 50 base.PluginTypeBase: &base.PluginBase{Impl: d}, 51 "logmon": logmon.NewPlugin(logmon.NewLogMon(logger.Named("logmon"))), 52 }, 53 ) 54 55 raw, err := client.Dispense(base.PluginTypeDriver) 56 if err != nil { 57 t.Fatalf("err dispensing plugin: %v", err) 58 } 59 60 dClient := raw.(drivers.DriverPlugin) 61 h := &DriverHarness{ 62 client: client, 63 server: server, 64 DriverPlugin: dClient, 65 logger: logger, 66 t: t, 67 impl: d, 68 } 69 70 return h 71} 72 73func (h *DriverHarness) Kill() { 74 h.client.Close() 75 h.server.Stop() 76} 77 78// MkAllocDir creates a temporary directory and allocdir structure. 79// If enableLogs is set to true a logmon instance will be started to write logs 80// to the LogDir of the task 81// A cleanup func is returned and should be defered so as to not leak dirs 82// between tests. 83func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func() { 84 dir, err := ioutil.TempDir("", "nomad_driver_harness-") 85 require.NoError(h.t, err) 86 t.AllocDir = dir 87 88 allocDir := allocdir.NewAllocDir(h.logger, dir) 89 require.NoError(h.t, allocDir.Build()) 90 taskDir := allocDir.NewTaskDir(t.Name) 91 92 caps, err := h.Capabilities() 93 require.NoError(h.t, err) 94 95 fsi := caps.FSIsolation 96 require.NoError(h.t, taskDir.Build(fsi == drivers.FSIsolationChroot, config.DefaultChrootEnv)) 97 98 task := &structs.Task{ 99 Name: t.Name, 100 Env: t.Env, 101 } 102 103 // Create the mock allocation 104 alloc := mock.Alloc() 105 if t.Resources != nil { 106 alloc.AllocatedResources.Tasks[task.Name] = t.Resources.NomadResources 107 } 108 109 taskBuilder := taskenv.NewBuilder(mock.Node(), alloc, task, "global") 110 SetEnvvars(taskBuilder, fsi, taskDir, config.DefaultConfig()) 111 112 taskEnv := taskBuilder.Build() 113 if t.Env == nil { 114 t.Env = taskEnv.Map() 115 } else { 116 for k, v := range taskEnv.Map() { 117 if _, ok := t.Env[k]; !ok { 118 t.Env[k] = v 119 } 120 } 121 } 122 123 //logmon 124 if enableLogs { 125 lm := logmon.NewLogMon(h.logger.Named("logmon")) 126 if runtime.GOOS == "windows" { 127 id := uuid.Generate()[:8] 128 t.StdoutPath = fmt.Sprintf("//./pipe/%s-%s.stdout", t.Name, id) 129 t.StderrPath = fmt.Sprintf("//./pipe/%s-%s.stderr", t.Name, id) 130 } else { 131 t.StdoutPath = filepath.Join(taskDir.LogDir, fmt.Sprintf(".%s.stdout.fifo", t.Name)) 132 t.StderrPath = filepath.Join(taskDir.LogDir, fmt.Sprintf(".%s.stderr.fifo", t.Name)) 133 } 134 err = lm.Start(&logmon.LogConfig{ 135 LogDir: taskDir.LogDir, 136 StdoutLogFile: fmt.Sprintf("%s.stdout", t.Name), 137 StderrLogFile: fmt.Sprintf("%s.stderr", t.Name), 138 StdoutFifo: t.StdoutPath, 139 StderrFifo: t.StderrPath, 140 MaxFiles: 10, 141 MaxFileSizeMB: 10, 142 }) 143 require.NoError(h.t, err) 144 145 return func() { 146 lm.Stop() 147 h.client.Close() 148 allocDir.Destroy() 149 } 150 } 151 152 return func() { 153 h.client.Close() 154 allocDir.Destroy() 155 } 156} 157 158// WaitUntilStarted will block until the task for the given ID is in the running 159// state or the timeout is reached 160func (h *DriverHarness) WaitUntilStarted(taskID string, timeout time.Duration) error { 161 deadline := time.Now().Add(timeout) 162 var lastState drivers.TaskState 163 for { 164 status, err := h.InspectTask(taskID) 165 if err != nil { 166 return err 167 } 168 if status.State == drivers.TaskStateRunning { 169 return nil 170 } 171 lastState = status.State 172 if time.Now().After(deadline) { 173 return fmt.Errorf("task never transitioned to running, currently '%s'", lastState) 174 } 175 time.Sleep(100 * time.Millisecond) 176 } 177} 178 179// MockDriver is used for testing. 180// Each function can be set as a closure to make assertions about how data 181// is passed through the base plugin layer. 182type MockDriver struct { 183 base.MockPlugin 184 TaskConfigSchemaF func() (*hclspec.Spec, error) 185 FingerprintF func(context.Context) (<-chan *drivers.Fingerprint, error) 186 CapabilitiesF func() (*drivers.Capabilities, error) 187 RecoverTaskF func(*drivers.TaskHandle) error 188 StartTaskF func(*drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) 189 WaitTaskF func(context.Context, string) (<-chan *drivers.ExitResult, error) 190 StopTaskF func(string, time.Duration, string) error 191 DestroyTaskF func(string, bool) error 192 InspectTaskF func(string) (*drivers.TaskStatus, error) 193 TaskStatsF func(context.Context, string, time.Duration) (<-chan *drivers.TaskResourceUsage, error) 194 TaskEventsF func(context.Context) (<-chan *drivers.TaskEvent, error) 195 SignalTaskF func(string, string) error 196 ExecTaskF func(string, []string, time.Duration) (*drivers.ExecTaskResult, error) 197 ExecTaskStreamingF func(context.Context, string, *drivers.ExecOptions) (*drivers.ExitResult, error) 198 MockNetworkManager 199} 200 201type MockNetworkManager struct { 202 CreateNetworkF func(string) (*drivers.NetworkIsolationSpec, bool, error) 203 DestroyNetworkF func(string, *drivers.NetworkIsolationSpec) error 204} 205 206func (m *MockNetworkManager) CreateNetwork(id string) (*drivers.NetworkIsolationSpec, bool, error) { 207 return m.CreateNetworkF(id) 208} 209func (m *MockNetworkManager) DestroyNetwork(id string, spec *drivers.NetworkIsolationSpec) error { 210 return m.DestroyNetworkF(id, spec) 211} 212 213func (d *MockDriver) TaskConfigSchema() (*hclspec.Spec, error) { return d.TaskConfigSchemaF() } 214func (d *MockDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { 215 return d.FingerprintF(ctx) 216} 217func (d *MockDriver) Capabilities() (*drivers.Capabilities, error) { return d.CapabilitiesF() } 218func (d *MockDriver) RecoverTask(h *drivers.TaskHandle) error { return d.RecoverTaskF(h) } 219func (d *MockDriver) StartTask(c *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { 220 return d.StartTaskF(c) 221} 222func (d *MockDriver) WaitTask(ctx context.Context, id string) (<-chan *drivers.ExitResult, error) { 223 return d.WaitTaskF(ctx, id) 224} 225func (d *MockDriver) StopTask(taskID string, timeout time.Duration, signal string) error { 226 return d.StopTaskF(taskID, timeout, signal) 227} 228func (d *MockDriver) DestroyTask(taskID string, force bool) error { 229 return d.DestroyTaskF(taskID, force) 230} 231func (d *MockDriver) InspectTask(taskID string) (*drivers.TaskStatus, error) { 232 return d.InspectTaskF(taskID) 233} 234func (d *MockDriver) TaskStats(ctx context.Context, taskID string, i time.Duration) (<-chan *drivers.TaskResourceUsage, error) { 235 return d.TaskStats(ctx, taskID, i) 236} 237func (d *MockDriver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { 238 return d.TaskEventsF(ctx) 239} 240func (d *MockDriver) SignalTask(taskID string, signal string) error { 241 return d.SignalTask(taskID, signal) 242} 243func (d *MockDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { 244 return d.ExecTaskF(taskID, cmd, timeout) 245} 246 247func (d *MockDriver) ExecTaskStreaming(ctx context.Context, taskID string, execOpts *drivers.ExecOptions) (*drivers.ExitResult, error) { 248 return d.ExecTaskStreamingF(ctx, taskID, execOpts) 249} 250 251// SetEnvvars sets path and host env vars depending on the FS isolation used. 252func SetEnvvars(envBuilder *taskenv.Builder, fsi drivers.FSIsolation, taskDir *allocdir.TaskDir, conf *config.Config) { 253 // Set driver-specific environment variables 254 switch fsi { 255 case drivers.FSIsolationNone: 256 // Use host paths 257 envBuilder.SetAllocDir(taskDir.SharedAllocDir) 258 envBuilder.SetTaskLocalDir(taskDir.LocalDir) 259 envBuilder.SetSecretsDir(taskDir.SecretsDir) 260 default: 261 // filesystem isolation; use container paths 262 envBuilder.SetAllocDir(allocdir.SharedAllocContainerPath) 263 envBuilder.SetTaskLocalDir(allocdir.TaskLocalContainerPath) 264 envBuilder.SetSecretsDir(allocdir.TaskSecretsContainerPath) 265 } 266 267 // Set the host environment variables for non-image based drivers 268 if fsi != drivers.FSIsolationImage { 269 filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",") 270 envBuilder.SetHostEnvvars(filter) 271 } 272} 273