1package exec
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"path"
8	"path/filepath"
9	"strings"
10
11	"code.cloudfoundry.org/lager"
12	"code.cloudfoundry.org/lager/lagerctx"
13	"github.com/concourse/concourse/atc"
14	"github.com/concourse/concourse/atc/creds"
15	"github.com/concourse/concourse/atc/db"
16	"github.com/concourse/concourse/atc/db/lock"
17	"github.com/concourse/concourse/atc/exec/build"
18	"github.com/concourse/concourse/atc/runtime"
19	"github.com/concourse/concourse/atc/worker"
20	"github.com/concourse/concourse/tracing"
21	"github.com/concourse/concourse/vars"
22)
23
24// MissingInputsError is returned when any of the task's required inputs are
25// missing.
26type MissingInputsError struct {
27	Inputs []string
28}
29
30// Error prints a human-friendly message listing the inputs that were missing.
31func (err MissingInputsError) Error() string {
32	return fmt.Sprintf("missing inputs: %s", strings.Join(err.Inputs, ", "))
33}
34
35type MissingTaskImageSourceError struct {
36	SourceName string
37}
38
39func (err MissingTaskImageSourceError) Error() string {
40	return fmt.Sprintf(`missing image artifact source: %s
41
42make sure there's a corresponding 'get' step, or a task that produces it as an output`, err.SourceName)
43}
44
45type TaskImageSourceParametersError struct {
46	Err error
47}
48
49func (err TaskImageSourceParametersError) Error() string {
50	return fmt.Sprintf("failed to evaluate image resource parameters: %s", err.Err)
51}
52
53//go:generate counterfeiter . TaskDelegate
54
55type TaskDelegate interface {
56	ImageVersionDetermined(db.UsedResourceCache) error
57	RedactImageSource(source atc.Source) (atc.Source, error)
58
59	Stdout() io.Writer
60	Stderr() io.Writer
61
62	Variables() *vars.BuildVariables
63
64	SetTaskConfig(config atc.TaskConfig)
65
66	Initializing(lager.Logger)
67	Starting(lager.Logger)
68	Finished(lager.Logger, ExitStatus)
69	SelectedWorker(lager.Logger, string)
70	Errored(lager.Logger, string)
71}
72
73// TaskStep executes a TaskConfig, whose inputs will be fetched from the
74// artifact.Repository and outputs will be added to the artifact.Repository.
75type TaskStep struct {
76	planID            atc.PlanID
77	plan              atc.TaskPlan
78	defaultLimits     atc.ContainerLimits
79	metadata          StepMetadata
80	containerMetadata db.ContainerMetadata
81	strategy          worker.ContainerPlacementStrategy
82	workerClient      worker.Client
83	delegate          TaskDelegate
84	lockFactory       lock.LockFactory
85	succeeded         bool
86}
87
88func NewTaskStep(
89	planID atc.PlanID,
90	plan atc.TaskPlan,
91	defaultLimits atc.ContainerLimits,
92	metadata StepMetadata,
93	containerMetadata db.ContainerMetadata,
94	strategy worker.ContainerPlacementStrategy,
95	workerClient worker.Client,
96	delegate TaskDelegate,
97	lockFactory lock.LockFactory,
98) Step {
99	return &TaskStep{
100		planID:            planID,
101		plan:              plan,
102		defaultLimits:     defaultLimits,
103		metadata:          metadata,
104		containerMetadata: containerMetadata,
105		strategy:          strategy,
106		workerClient:      workerClient,
107		delegate:          delegate,
108		lockFactory:       lockFactory,
109	}
110}
111
112// Run will first select the worker based on the TaskConfig's platform and the
113// TaskStep's tags, and prioritize it by availability of volumes for the TaskConfig's
114// inputs. Inputs that did not have volumes available on the worker will be streamed
115// in to the container.
116//
117// If any inputs are not available in the artifact.Repository, MissingInputsError
118// is returned.
119//
120// Once all the inputs are satisfied, the task's script will be executed. If
121// the task is canceled via the context, the script will be interrupted.
122//
123// If the script exits successfully, the outputs specified in the TaskConfig
124// are registered with the artifact.Repository. If no outputs are specified, the
125// task's entire working directory is registered as an StreamableArtifactSource under the
126// name of the task.
127func (step *TaskStep) Run(ctx context.Context, state RunState) error {
128	ctx, span := tracing.StartSpan(ctx, "task", tracing.Attrs{
129		"team":     step.metadata.TeamName,
130		"pipeline": step.metadata.PipelineName,
131		"job":      step.metadata.JobName,
132		"build":    step.metadata.BuildName,
133		"name":     step.plan.Name,
134	})
135
136	err := step.run(ctx, state)
137	tracing.End(span, err)
138
139	return err
140}
141
142func (step *TaskStep) run(ctx context.Context, state RunState) error {
143	logger := lagerctx.FromContext(ctx)
144	logger = logger.Session("task-step", lager.Data{
145		"step-name": step.plan.Name,
146		"job-id":    step.metadata.JobID,
147	})
148
149	variables := step.delegate.Variables()
150	resourceTypes, err := creds.NewVersionedResourceTypes(variables, step.plan.VersionedResourceTypes).Evaluate()
151	if err != nil {
152		return err
153	}
154
155	var taskConfigSource TaskConfigSource
156	var taskVars []vars.Variables
157
158	if step.plan.ConfigPath != "" {
159		// external task - construct a source which reads it from file, and apply base resource type defaults.
160		taskConfigSource = FileConfigSource{ConfigPath: step.plan.ConfigPath, Client: step.workerClient}
161
162		// for interpolation - use 'vars' from the pipeline, and then fill remaining with cred variables.
163		// this 2-phase strategy allows to interpolate 'vars' by cred variables.
164		if len(step.plan.Vars) > 0 {
165			taskConfigSource = InterpolateTemplateConfigSource{
166				ConfigSource:  taskConfigSource,
167				Vars:          []vars.Variables{vars.StaticVariables(step.plan.Vars)},
168				ExpectAllKeys: false,
169			}
170		}
171		taskVars = []vars.Variables{variables}
172	} else {
173		// embedded task - first we take it
174		taskConfigSource = StaticConfigSource{Config: step.plan.Config}
175
176		// for interpolation - use just cred variables
177		taskVars = []vars.Variables{variables}
178	}
179
180	// apply resource type defaults
181	taskConfigSource = BaseResourceTypeDefaultsApplySource{
182		ConfigSource:  taskConfigSource,
183		ResourceTypes: step.plan.VersionedResourceTypes,
184	}
185
186	// override params
187	taskConfigSource = &OverrideParamsConfigSource{ConfigSource: taskConfigSource, Params: step.plan.Params}
188
189	// interpolate template vars
190	taskConfigSource = InterpolateTemplateConfigSource{
191		ConfigSource:  taskConfigSource,
192		Vars:          taskVars,
193		ExpectAllKeys: true,
194	}
195
196	// validate
197	taskConfigSource = ValidatingConfigSource{ConfigSource: taskConfigSource}
198
199	repository := state.ArtifactRepository()
200
201	config, err := taskConfigSource.FetchConfig(ctx, logger, repository)
202
203	step.delegate.SetTaskConfig(config)
204
205	for _, warning := range taskConfigSource.Warnings() {
206		fmt.Fprintln(step.delegate.Stderr(), "[WARNING]", warning)
207	}
208
209	if err != nil {
210		return err
211	}
212
213	if config.Limits == nil {
214		config.Limits = &atc.ContainerLimits{}
215	}
216	if config.Limits.CPU == nil {
217		config.Limits.CPU = step.defaultLimits.CPU
218	}
219	if config.Limits.Memory == nil {
220		config.Limits.Memory = step.defaultLimits.Memory
221	}
222
223	step.delegate.Initializing(logger)
224
225	workerSpec, err := step.workerSpec(logger, resourceTypes, repository, config)
226	if err != nil {
227		return err
228	}
229
230	containerSpec, err := step.containerSpec(logger, repository, config, step.containerMetadata)
231	if err != nil {
232		return err
233	}
234	tracing.Inject(ctx, &containerSpec)
235
236	processSpec := runtime.ProcessSpec{
237		Path:         config.Run.Path,
238		Args:         config.Run.Args,
239		Dir:          config.Run.Dir,
240		StdoutWriter: step.delegate.Stdout(),
241		StderrWriter: step.delegate.Stderr(),
242	}
243
244	imageSpec := worker.ImageFetcherSpec{
245		ResourceTypes: resourceTypes,
246		Delegate:      step.delegate,
247	}
248
249	owner := db.NewBuildStepContainerOwner(step.metadata.BuildID, step.planID, step.metadata.TeamID)
250
251	result, err := step.workerClient.RunTaskStep(
252		ctx,
253		logger,
254		owner,
255		containerSpec,
256		workerSpec,
257		step.strategy,
258		step.containerMetadata,
259		imageSpec,
260		processSpec,
261		step.delegate,
262		step.lockFactory,
263	)
264
265	if err != nil {
266		if err == context.Canceled || err == context.DeadlineExceeded {
267			step.registerOutputs(logger, repository, config, result.VolumeMounts, step.containerMetadata)
268		}
269		return err
270	}
271
272	step.succeeded = result.ExitStatus == 0
273	step.delegate.Finished(logger, ExitStatus(result.ExitStatus))
274
275	step.registerOutputs(logger, repository, config, result.VolumeMounts, step.containerMetadata)
276
277	// Do not initialize caches for one-off builds
278	if step.metadata.JobID != 0 {
279		err = step.registerCaches(logger, repository, config, result.VolumeMounts, step.containerMetadata)
280		if err != nil {
281			return err
282		}
283	}
284
285	return nil
286}
287
288func (step *TaskStep) Succeeded() bool {
289	return step.succeeded
290}
291
292func (step *TaskStep) imageSpec(logger lager.Logger, repository *build.Repository, config atc.TaskConfig) (worker.ImageSpec, error) {
293	imageSpec := worker.ImageSpec{
294		Privileged: bool(step.plan.Privileged),
295	}
296
297	// Determine the source of the container image
298	// a reference to an artifact (get step, task output) ?
299	if step.plan.ImageArtifactName != "" {
300		art, found := repository.ArtifactFor(build.ArtifactName(step.plan.ImageArtifactName))
301		if !found {
302			return worker.ImageSpec{}, MissingTaskImageSourceError{step.plan.ImageArtifactName}
303		}
304
305		imageSpec.ImageArtifact = art
306
307		//an image_resource
308	} else if config.ImageResource != nil {
309		imageSpec.ImageResource = &worker.ImageResource{
310			Type:    config.ImageResource.Type,
311			Source:  config.ImageResource.Source,
312			Params:  config.ImageResource.Params,
313			Version: config.ImageResource.Version,
314		}
315		// a rootfs_uri
316	} else if config.RootfsURI != "" {
317		imageSpec.ImageURL = config.RootfsURI
318	}
319
320	return imageSpec, nil
321}
322
323func (step *TaskStep) containerInputs(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, metadata db.ContainerMetadata) (map[string]runtime.Artifact, error) {
324	inputs := map[string]runtime.Artifact{}
325
326	var missingRequiredInputs []string
327
328	for _, input := range config.Inputs {
329		inputName := input.Name
330		if sourceName, ok := step.plan.InputMapping[inputName]; ok {
331			inputName = sourceName
332		}
333
334		art, found := repository.ArtifactFor(build.ArtifactName(inputName))
335		if !found {
336			if !input.Optional {
337				missingRequiredInputs = append(missingRequiredInputs, inputName)
338			}
339			continue
340		}
341		ti := taskInput{
342			config:        input,
343			artifact:      art,
344			artifactsRoot: metadata.WorkingDirectory,
345		}
346
347		inputs[ti.Path()] = ti.Artifact()
348	}
349
350	if len(missingRequiredInputs) > 0 {
351		return nil, MissingInputsError{missingRequiredInputs}
352	}
353
354	for _, cacheConfig := range config.Caches {
355		cacheArt := &runtime.CacheArtifact{
356			TeamID:   step.metadata.TeamID,
357			JobID:    step.metadata.JobID,
358			StepName: step.plan.Name,
359			Path:     cacheConfig.Path,
360		}
361		ti := taskCacheInput{
362			artifact:      cacheArt,
363			artifactsRoot: metadata.WorkingDirectory,
364			cachePath:     cacheConfig.Path,
365		}
366		inputs[ti.Path()] = ti.Artifact()
367	}
368
369	return inputs, nil
370}
371
372func (step *TaskStep) containerSpec(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, metadata db.ContainerMetadata) (worker.ContainerSpec, error) {
373	imageSpec, err := step.imageSpec(logger, repository, config)
374	if err != nil {
375		return worker.ContainerSpec{}, err
376	}
377
378	var limits worker.ContainerLimits
379	if config.Limits != nil {
380		limits.CPU = config.Limits.CPU
381		limits.Memory = config.Limits.Memory
382	}
383
384	containerSpec := worker.ContainerSpec{
385		Platform:  config.Platform,
386		Tags:      step.plan.Tags,
387		TeamID:    step.metadata.TeamID,
388		ImageSpec: imageSpec,
389		Limits:    limits,
390		User:      config.Run.User,
391		Dir:       metadata.WorkingDirectory,
392		Env:       config.Params.Env(),
393		Type:      metadata.Type,
394
395		Outputs: worker.OutputPaths{},
396	}
397
398	containerSpec.ArtifactByPath, err = step.containerInputs(logger, repository, config, metadata)
399	if err != nil {
400		return worker.ContainerSpec{}, err
401	}
402
403	for _, output := range config.Outputs {
404		path := artifactsPath(output, metadata.WorkingDirectory)
405		containerSpec.Outputs[output.Name] = path
406	}
407
408	return containerSpec, nil
409}
410
411func (step *TaskStep) workerSpec(logger lager.Logger, resourceTypes atc.VersionedResourceTypes, repository *build.Repository, config atc.TaskConfig) (worker.WorkerSpec, error) {
412	workerSpec := worker.WorkerSpec{
413		Platform:      config.Platform,
414		Tags:          step.plan.Tags,
415		TeamID:        step.metadata.TeamID,
416		ResourceTypes: resourceTypes,
417	}
418
419	imageSpec, err := step.imageSpec(logger, repository, config)
420	if err != nil {
421		return worker.WorkerSpec{}, err
422	}
423
424	if imageSpec.ImageResource != nil {
425		workerSpec.ResourceType = imageSpec.ImageResource.Type
426	}
427
428	return workerSpec, nil
429}
430
431func (step *TaskStep) registerOutputs(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, volumeMounts []worker.VolumeMount, metadata db.ContainerMetadata) {
432	logger.Debug("registering-outputs", lager.Data{"outputs": config.Outputs})
433
434	for _, output := range config.Outputs {
435		outputName := output.Name
436		if destinationName, ok := step.plan.OutputMapping[output.Name]; ok {
437			outputName = destinationName
438		}
439
440		outputPath := artifactsPath(output, metadata.WorkingDirectory)
441
442		for _, mount := range volumeMounts {
443			if filepath.Clean(mount.MountPath) == filepath.Clean(outputPath) {
444				art := &runtime.TaskArtifact{
445					VolumeHandle: mount.Volume.Handle(),
446				}
447				repository.RegisterArtifact(build.ArtifactName(outputName), art)
448			}
449		}
450	}
451}
452
453func (step *TaskStep) registerCaches(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, volumeMounts []worker.VolumeMount, metadata db.ContainerMetadata) error {
454	logger.Debug("initializing-caches", lager.Data{"caches": config.Caches})
455
456	for _, cacheConfig := range config.Caches {
457		for _, volumeMount := range volumeMounts {
458			if volumeMount.MountPath == filepath.Join(metadata.WorkingDirectory, cacheConfig.Path) {
459				logger.Debug("initializing-cache", lager.Data{"path": volumeMount.MountPath})
460
461				err := volumeMount.Volume.InitializeTaskCache(
462					logger,
463					step.metadata.JobID,
464					step.plan.Name,
465					cacheConfig.Path,
466					bool(step.plan.Privileged))
467				if err != nil {
468					return err
469				}
470
471				continue
472			}
473		}
474	}
475	return nil
476}
477
478type taskInput struct {
479	config        atc.TaskInputConfig
480	artifact      runtime.Artifact
481	artifactsRoot string
482}
483
484func (s taskInput) Artifact() runtime.Artifact { return s.artifact }
485
486func (s taskInput) Path() string {
487	subdir := s.config.Path
488	if s.config.Path == "" {
489		subdir = s.config.Name
490	}
491
492	return filepath.Join(s.artifactsRoot, subdir)
493}
494
495func artifactsPath(outputConfig atc.TaskOutputConfig, artifactsRoot string) string {
496	outputSrc := outputConfig.Path
497	if len(outputSrc) == 0 {
498		outputSrc = outputConfig.Name
499	}
500
501	return path.Join(artifactsRoot, outputSrc) + "/"
502}
503
504type taskCacheInput struct {
505	artifact      runtime.Artifact
506	artifactsRoot string
507	cachePath     string
508}
509
510func (s taskCacheInput) Artifact() runtime.Artifact { return s.artifact }
511
512func (s taskCacheInput) Path() string {
513	return filepath.Join(s.artifactsRoot, s.cachePath)
514}
515