1package exec 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "path" 8 "path/filepath" 9 "strings" 10 11 "code.cloudfoundry.org/lager" 12 "code.cloudfoundry.org/lager/lagerctx" 13 "github.com/concourse/concourse/atc" 14 "github.com/concourse/concourse/atc/creds" 15 "github.com/concourse/concourse/atc/db" 16 "github.com/concourse/concourse/atc/db/lock" 17 "github.com/concourse/concourse/atc/exec/build" 18 "github.com/concourse/concourse/atc/runtime" 19 "github.com/concourse/concourse/atc/worker" 20 "github.com/concourse/concourse/tracing" 21 "github.com/concourse/concourse/vars" 22) 23 24// MissingInputsError is returned when any of the task's required inputs are 25// missing. 26type MissingInputsError struct { 27 Inputs []string 28} 29 30// Error prints a human-friendly message listing the inputs that were missing. 31func (err MissingInputsError) Error() string { 32 return fmt.Sprintf("missing inputs: %s", strings.Join(err.Inputs, ", ")) 33} 34 35type MissingTaskImageSourceError struct { 36 SourceName string 37} 38 39func (err MissingTaskImageSourceError) Error() string { 40 return fmt.Sprintf(`missing image artifact source: %s 41 42make sure there's a corresponding 'get' step, or a task that produces it as an output`, err.SourceName) 43} 44 45type TaskImageSourceParametersError struct { 46 Err error 47} 48 49func (err TaskImageSourceParametersError) Error() string { 50 return fmt.Sprintf("failed to evaluate image resource parameters: %s", err.Err) 51} 52 53//go:generate counterfeiter . TaskDelegate 54 55type TaskDelegate interface { 56 ImageVersionDetermined(db.UsedResourceCache) error 57 RedactImageSource(source atc.Source) (atc.Source, error) 58 59 Stdout() io.Writer 60 Stderr() io.Writer 61 62 Variables() *vars.BuildVariables 63 64 SetTaskConfig(config atc.TaskConfig) 65 66 Initializing(lager.Logger) 67 Starting(lager.Logger) 68 Finished(lager.Logger, ExitStatus) 69 SelectedWorker(lager.Logger, string) 70 Errored(lager.Logger, string) 71} 72 73// TaskStep executes a TaskConfig, whose inputs will be fetched from the 74// artifact.Repository and outputs will be added to the artifact.Repository. 75type TaskStep struct { 76 planID atc.PlanID 77 plan atc.TaskPlan 78 defaultLimits atc.ContainerLimits 79 metadata StepMetadata 80 containerMetadata db.ContainerMetadata 81 strategy worker.ContainerPlacementStrategy 82 workerClient worker.Client 83 delegate TaskDelegate 84 lockFactory lock.LockFactory 85 succeeded bool 86} 87 88func NewTaskStep( 89 planID atc.PlanID, 90 plan atc.TaskPlan, 91 defaultLimits atc.ContainerLimits, 92 metadata StepMetadata, 93 containerMetadata db.ContainerMetadata, 94 strategy worker.ContainerPlacementStrategy, 95 workerClient worker.Client, 96 delegate TaskDelegate, 97 lockFactory lock.LockFactory, 98) Step { 99 return &TaskStep{ 100 planID: planID, 101 plan: plan, 102 defaultLimits: defaultLimits, 103 metadata: metadata, 104 containerMetadata: containerMetadata, 105 strategy: strategy, 106 workerClient: workerClient, 107 delegate: delegate, 108 lockFactory: lockFactory, 109 } 110} 111 112// Run will first select the worker based on the TaskConfig's platform and the 113// TaskStep's tags, and prioritize it by availability of volumes for the TaskConfig's 114// inputs. Inputs that did not have volumes available on the worker will be streamed 115// in to the container. 116// 117// If any inputs are not available in the artifact.Repository, MissingInputsError 118// is returned. 119// 120// Once all the inputs are satisfied, the task's script will be executed. If 121// the task is canceled via the context, the script will be interrupted. 122// 123// If the script exits successfully, the outputs specified in the TaskConfig 124// are registered with the artifact.Repository. If no outputs are specified, the 125// task's entire working directory is registered as an StreamableArtifactSource under the 126// name of the task. 127func (step *TaskStep) Run(ctx context.Context, state RunState) error { 128 ctx, span := tracing.StartSpan(ctx, "task", tracing.Attrs{ 129 "team": step.metadata.TeamName, 130 "pipeline": step.metadata.PipelineName, 131 "job": step.metadata.JobName, 132 "build": step.metadata.BuildName, 133 "name": step.plan.Name, 134 }) 135 136 err := step.run(ctx, state) 137 tracing.End(span, err) 138 139 return err 140} 141 142func (step *TaskStep) run(ctx context.Context, state RunState) error { 143 logger := lagerctx.FromContext(ctx) 144 logger = logger.Session("task-step", lager.Data{ 145 "step-name": step.plan.Name, 146 "job-id": step.metadata.JobID, 147 }) 148 149 variables := step.delegate.Variables() 150 resourceTypes, err := creds.NewVersionedResourceTypes(variables, step.plan.VersionedResourceTypes).Evaluate() 151 if err != nil { 152 return err 153 } 154 155 var taskConfigSource TaskConfigSource 156 var taskVars []vars.Variables 157 158 if step.plan.ConfigPath != "" { 159 // external task - construct a source which reads it from file, and apply base resource type defaults. 160 taskConfigSource = FileConfigSource{ConfigPath: step.plan.ConfigPath, Client: step.workerClient} 161 162 // for interpolation - use 'vars' from the pipeline, and then fill remaining with cred variables. 163 // this 2-phase strategy allows to interpolate 'vars' by cred variables. 164 if len(step.plan.Vars) > 0 { 165 taskConfigSource = InterpolateTemplateConfigSource{ 166 ConfigSource: taskConfigSource, 167 Vars: []vars.Variables{vars.StaticVariables(step.plan.Vars)}, 168 ExpectAllKeys: false, 169 } 170 } 171 taskVars = []vars.Variables{variables} 172 } else { 173 // embedded task - first we take it 174 taskConfigSource = StaticConfigSource{Config: step.plan.Config} 175 176 // for interpolation - use just cred variables 177 taskVars = []vars.Variables{variables} 178 } 179 180 // apply resource type defaults 181 taskConfigSource = BaseResourceTypeDefaultsApplySource{ 182 ConfigSource: taskConfigSource, 183 ResourceTypes: step.plan.VersionedResourceTypes, 184 } 185 186 // override params 187 taskConfigSource = &OverrideParamsConfigSource{ConfigSource: taskConfigSource, Params: step.plan.Params} 188 189 // interpolate template vars 190 taskConfigSource = InterpolateTemplateConfigSource{ 191 ConfigSource: taskConfigSource, 192 Vars: taskVars, 193 ExpectAllKeys: true, 194 } 195 196 // validate 197 taskConfigSource = ValidatingConfigSource{ConfigSource: taskConfigSource} 198 199 repository := state.ArtifactRepository() 200 201 config, err := taskConfigSource.FetchConfig(ctx, logger, repository) 202 203 step.delegate.SetTaskConfig(config) 204 205 for _, warning := range taskConfigSource.Warnings() { 206 fmt.Fprintln(step.delegate.Stderr(), "[WARNING]", warning) 207 } 208 209 if err != nil { 210 return err 211 } 212 213 if config.Limits == nil { 214 config.Limits = &atc.ContainerLimits{} 215 } 216 if config.Limits.CPU == nil { 217 config.Limits.CPU = step.defaultLimits.CPU 218 } 219 if config.Limits.Memory == nil { 220 config.Limits.Memory = step.defaultLimits.Memory 221 } 222 223 step.delegate.Initializing(logger) 224 225 workerSpec, err := step.workerSpec(logger, resourceTypes, repository, config) 226 if err != nil { 227 return err 228 } 229 230 containerSpec, err := step.containerSpec(logger, repository, config, step.containerMetadata) 231 if err != nil { 232 return err 233 } 234 tracing.Inject(ctx, &containerSpec) 235 236 processSpec := runtime.ProcessSpec{ 237 Path: config.Run.Path, 238 Args: config.Run.Args, 239 Dir: config.Run.Dir, 240 StdoutWriter: step.delegate.Stdout(), 241 StderrWriter: step.delegate.Stderr(), 242 } 243 244 imageSpec := worker.ImageFetcherSpec{ 245 ResourceTypes: resourceTypes, 246 Delegate: step.delegate, 247 } 248 249 owner := db.NewBuildStepContainerOwner(step.metadata.BuildID, step.planID, step.metadata.TeamID) 250 251 result, err := step.workerClient.RunTaskStep( 252 ctx, 253 logger, 254 owner, 255 containerSpec, 256 workerSpec, 257 step.strategy, 258 step.containerMetadata, 259 imageSpec, 260 processSpec, 261 step.delegate, 262 step.lockFactory, 263 ) 264 265 if err != nil { 266 if err == context.Canceled || err == context.DeadlineExceeded { 267 step.registerOutputs(logger, repository, config, result.VolumeMounts, step.containerMetadata) 268 } 269 return err 270 } 271 272 step.succeeded = result.ExitStatus == 0 273 step.delegate.Finished(logger, ExitStatus(result.ExitStatus)) 274 275 step.registerOutputs(logger, repository, config, result.VolumeMounts, step.containerMetadata) 276 277 // Do not initialize caches for one-off builds 278 if step.metadata.JobID != 0 { 279 err = step.registerCaches(logger, repository, config, result.VolumeMounts, step.containerMetadata) 280 if err != nil { 281 return err 282 } 283 } 284 285 return nil 286} 287 288func (step *TaskStep) Succeeded() bool { 289 return step.succeeded 290} 291 292func (step *TaskStep) imageSpec(logger lager.Logger, repository *build.Repository, config atc.TaskConfig) (worker.ImageSpec, error) { 293 imageSpec := worker.ImageSpec{ 294 Privileged: bool(step.plan.Privileged), 295 } 296 297 // Determine the source of the container image 298 // a reference to an artifact (get step, task output) ? 299 if step.plan.ImageArtifactName != "" { 300 art, found := repository.ArtifactFor(build.ArtifactName(step.plan.ImageArtifactName)) 301 if !found { 302 return worker.ImageSpec{}, MissingTaskImageSourceError{step.plan.ImageArtifactName} 303 } 304 305 imageSpec.ImageArtifact = art 306 307 //an image_resource 308 } else if config.ImageResource != nil { 309 imageSpec.ImageResource = &worker.ImageResource{ 310 Type: config.ImageResource.Type, 311 Source: config.ImageResource.Source, 312 Params: config.ImageResource.Params, 313 Version: config.ImageResource.Version, 314 } 315 // a rootfs_uri 316 } else if config.RootfsURI != "" { 317 imageSpec.ImageURL = config.RootfsURI 318 } 319 320 return imageSpec, nil 321} 322 323func (step *TaskStep) containerInputs(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, metadata db.ContainerMetadata) (map[string]runtime.Artifact, error) { 324 inputs := map[string]runtime.Artifact{} 325 326 var missingRequiredInputs []string 327 328 for _, input := range config.Inputs { 329 inputName := input.Name 330 if sourceName, ok := step.plan.InputMapping[inputName]; ok { 331 inputName = sourceName 332 } 333 334 art, found := repository.ArtifactFor(build.ArtifactName(inputName)) 335 if !found { 336 if !input.Optional { 337 missingRequiredInputs = append(missingRequiredInputs, inputName) 338 } 339 continue 340 } 341 ti := taskInput{ 342 config: input, 343 artifact: art, 344 artifactsRoot: metadata.WorkingDirectory, 345 } 346 347 inputs[ti.Path()] = ti.Artifact() 348 } 349 350 if len(missingRequiredInputs) > 0 { 351 return nil, MissingInputsError{missingRequiredInputs} 352 } 353 354 for _, cacheConfig := range config.Caches { 355 cacheArt := &runtime.CacheArtifact{ 356 TeamID: step.metadata.TeamID, 357 JobID: step.metadata.JobID, 358 StepName: step.plan.Name, 359 Path: cacheConfig.Path, 360 } 361 ti := taskCacheInput{ 362 artifact: cacheArt, 363 artifactsRoot: metadata.WorkingDirectory, 364 cachePath: cacheConfig.Path, 365 } 366 inputs[ti.Path()] = ti.Artifact() 367 } 368 369 return inputs, nil 370} 371 372func (step *TaskStep) containerSpec(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, metadata db.ContainerMetadata) (worker.ContainerSpec, error) { 373 imageSpec, err := step.imageSpec(logger, repository, config) 374 if err != nil { 375 return worker.ContainerSpec{}, err 376 } 377 378 var limits worker.ContainerLimits 379 if config.Limits != nil { 380 limits.CPU = config.Limits.CPU 381 limits.Memory = config.Limits.Memory 382 } 383 384 containerSpec := worker.ContainerSpec{ 385 Platform: config.Platform, 386 Tags: step.plan.Tags, 387 TeamID: step.metadata.TeamID, 388 ImageSpec: imageSpec, 389 Limits: limits, 390 User: config.Run.User, 391 Dir: metadata.WorkingDirectory, 392 Env: config.Params.Env(), 393 Type: metadata.Type, 394 395 Outputs: worker.OutputPaths{}, 396 } 397 398 containerSpec.ArtifactByPath, err = step.containerInputs(logger, repository, config, metadata) 399 if err != nil { 400 return worker.ContainerSpec{}, err 401 } 402 403 for _, output := range config.Outputs { 404 path := artifactsPath(output, metadata.WorkingDirectory) 405 containerSpec.Outputs[output.Name] = path 406 } 407 408 return containerSpec, nil 409} 410 411func (step *TaskStep) workerSpec(logger lager.Logger, resourceTypes atc.VersionedResourceTypes, repository *build.Repository, config atc.TaskConfig) (worker.WorkerSpec, error) { 412 workerSpec := worker.WorkerSpec{ 413 Platform: config.Platform, 414 Tags: step.plan.Tags, 415 TeamID: step.metadata.TeamID, 416 ResourceTypes: resourceTypes, 417 } 418 419 imageSpec, err := step.imageSpec(logger, repository, config) 420 if err != nil { 421 return worker.WorkerSpec{}, err 422 } 423 424 if imageSpec.ImageResource != nil { 425 workerSpec.ResourceType = imageSpec.ImageResource.Type 426 } 427 428 return workerSpec, nil 429} 430 431func (step *TaskStep) registerOutputs(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, volumeMounts []worker.VolumeMount, metadata db.ContainerMetadata) { 432 logger.Debug("registering-outputs", lager.Data{"outputs": config.Outputs}) 433 434 for _, output := range config.Outputs { 435 outputName := output.Name 436 if destinationName, ok := step.plan.OutputMapping[output.Name]; ok { 437 outputName = destinationName 438 } 439 440 outputPath := artifactsPath(output, metadata.WorkingDirectory) 441 442 for _, mount := range volumeMounts { 443 if filepath.Clean(mount.MountPath) == filepath.Clean(outputPath) { 444 art := &runtime.TaskArtifact{ 445 VolumeHandle: mount.Volume.Handle(), 446 } 447 repository.RegisterArtifact(build.ArtifactName(outputName), art) 448 } 449 } 450 } 451} 452 453func (step *TaskStep) registerCaches(logger lager.Logger, repository *build.Repository, config atc.TaskConfig, volumeMounts []worker.VolumeMount, metadata db.ContainerMetadata) error { 454 logger.Debug("initializing-caches", lager.Data{"caches": config.Caches}) 455 456 for _, cacheConfig := range config.Caches { 457 for _, volumeMount := range volumeMounts { 458 if volumeMount.MountPath == filepath.Join(metadata.WorkingDirectory, cacheConfig.Path) { 459 logger.Debug("initializing-cache", lager.Data{"path": volumeMount.MountPath}) 460 461 err := volumeMount.Volume.InitializeTaskCache( 462 logger, 463 step.metadata.JobID, 464 step.plan.Name, 465 cacheConfig.Path, 466 bool(step.plan.Privileged)) 467 if err != nil { 468 return err 469 } 470 471 continue 472 } 473 } 474 } 475 return nil 476} 477 478type taskInput struct { 479 config atc.TaskInputConfig 480 artifact runtime.Artifact 481 artifactsRoot string 482} 483 484func (s taskInput) Artifact() runtime.Artifact { return s.artifact } 485 486func (s taskInput) Path() string { 487 subdir := s.config.Path 488 if s.config.Path == "" { 489 subdir = s.config.Name 490 } 491 492 return filepath.Join(s.artifactsRoot, subdir) 493} 494 495func artifactsPath(outputConfig atc.TaskOutputConfig, artifactsRoot string) string { 496 outputSrc := outputConfig.Path 497 if len(outputSrc) == 0 { 498 outputSrc = outputConfig.Name 499 } 500 501 return path.Join(artifactsRoot, outputSrc) + "/" 502} 503 504type taskCacheInput struct { 505 artifact runtime.Artifact 506 artifactsRoot string 507 cachePath string 508} 509 510func (s taskCacheInput) Artifact() runtime.Artifact { return s.artifact } 511 512func (s taskCacheInput) Path() string { 513 return filepath.Join(s.artifactsRoot, s.cachePath) 514} 515