1package worker_test 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "path" 9 "time" 10 11 "code.cloudfoundry.org/garden" 12 "code.cloudfoundry.org/garden/gardenfakes" 13 "github.com/concourse/concourse/atc" 14 "github.com/concourse/concourse/atc/compression/compressionfakes" 15 "github.com/concourse/concourse/atc/db/dbfakes" 16 "github.com/concourse/concourse/atc/db/lock/lockfakes" 17 "github.com/concourse/concourse/atc/exec/execfakes" 18 "github.com/concourse/concourse/atc/resource/resourcefakes" 19 "github.com/concourse/concourse/atc/runtime" 20 "github.com/concourse/concourse/atc/runtime/runtimefakes" 21 "github.com/onsi/gomega/gbytes" 22 23 "code.cloudfoundry.org/lager/lagertest" 24 "github.com/concourse/baggageclaim" 25 "github.com/concourse/concourse/atc/db" 26 "github.com/concourse/concourse/atc/worker" 27 "github.com/concourse/concourse/atc/worker/workerfakes" 28 29 . "github.com/onsi/ginkgo" 30 . "github.com/onsi/gomega" 31) 32 33var _ = Describe("Client", func() { 34 var ( 35 logger *lagertest.TestLogger 36 fakePool *workerfakes.FakePool 37 fakeProvider *workerfakes.FakeWorkerProvider 38 client worker.Client 39 fakeLock *lockfakes.FakeLock 40 fakeLockFactory *lockfakes.FakeLockFactory 41 fakeCompression *compressionfakes.FakeCompression 42 ) 43 44 BeforeEach(func() { 45 logger = lagertest.NewTestLogger("test") 46 fakePool = new(workerfakes.FakePool) 47 fakeProvider = new(workerfakes.FakeWorkerProvider) 48 fakeCompression = new(compressionfakes.FakeCompression) 49 workerPolling := 1 * time.Second 50 workerStatus := 2 * time.Second 51 52 client = worker.NewClient(fakePool, fakeProvider, fakeCompression, workerPolling, workerStatus) 53 }) 54 55 Describe("FindContainer", func() { 56 var ( 57 foundContainer worker.Container 58 found bool 59 findErr error 60 ) 61 62 JustBeforeEach(func() { 63 foundContainer, found, findErr = client.FindContainer( 64 logger, 65 4567, 66 "some-handle", 67 ) 68 }) 69 70 Context("when looking up the worker errors", func() { 71 BeforeEach(func() { 72 fakeProvider.FindWorkerForContainerReturns(nil, false, errors.New("nope")) 73 }) 74 75 It("errors", func() { 76 Expect(findErr).To(HaveOccurred()) 77 }) 78 }) 79 80 Context("when worker is not found", func() { 81 BeforeEach(func() { 82 fakeProvider.FindWorkerForContainerReturns(nil, false, nil) 83 }) 84 85 It("returns not found", func() { 86 Expect(findErr).NotTo(HaveOccurred()) 87 Expect(found).To(BeFalse()) 88 }) 89 }) 90 91 Context("when a worker is found with the container", func() { 92 var fakeWorker *workerfakes.FakeWorker 93 var fakeContainer *workerfakes.FakeContainer 94 95 BeforeEach(func() { 96 fakeWorker = new(workerfakes.FakeWorker) 97 fakeProvider.FindWorkerForContainerReturns(fakeWorker, true, nil) 98 99 fakeContainer = new(workerfakes.FakeContainer) 100 fakeWorker.FindContainerByHandleReturns(fakeContainer, true, nil) 101 }) 102 103 It("succeeds", func() { 104 Expect(found).To(BeTrue()) 105 Expect(findErr).NotTo(HaveOccurred()) 106 }) 107 108 It("returns the created container", func() { 109 Expect(foundContainer).To(Equal(fakeContainer)) 110 }) 111 }) 112 }) 113 114 Describe("FindVolume", func() { 115 var ( 116 foundVolume worker.Volume 117 found bool 118 findErr error 119 ) 120 121 JustBeforeEach(func() { 122 foundVolume, found, findErr = client.FindVolume( 123 logger, 124 4567, 125 "some-handle", 126 ) 127 }) 128 129 Context("when looking up the worker errors", func() { 130 BeforeEach(func() { 131 fakeProvider.FindWorkerForVolumeReturns(nil, false, errors.New("nope")) 132 }) 133 134 It("errors", func() { 135 Expect(findErr).To(HaveOccurred()) 136 }) 137 }) 138 139 Context("when worker is not found", func() { 140 BeforeEach(func() { 141 fakeProvider.FindWorkerForVolumeReturns(nil, false, nil) 142 }) 143 144 It("returns not found", func() { 145 Expect(findErr).NotTo(HaveOccurred()) 146 Expect(found).To(BeFalse()) 147 }) 148 }) 149 150 Context("when a worker is found with the volume", func() { 151 var fakeWorker *workerfakes.FakeWorker 152 var fakeVolume *workerfakes.FakeVolume 153 154 BeforeEach(func() { 155 fakeWorker = new(workerfakes.FakeWorker) 156 fakeProvider.FindWorkerForVolumeReturns(fakeWorker, true, nil) 157 158 fakeVolume = new(workerfakes.FakeVolume) 159 fakeWorker.LookupVolumeReturns(fakeVolume, true, nil) 160 }) 161 162 It("succeeds", func() { 163 Expect(found).To(BeTrue()) 164 Expect(findErr).NotTo(HaveOccurred()) 165 }) 166 167 It("returns the volume", func() { 168 Expect(foundVolume).To(Equal(fakeVolume)) 169 }) 170 }) 171 }) 172 173 Describe("CreateVolume", func() { 174 var ( 175 fakeWorker *workerfakes.FakeWorker 176 volumeSpec worker.VolumeSpec 177 workerSpec worker.WorkerSpec 178 volumeType db.VolumeType 179 err error 180 ) 181 182 BeforeEach(func() { 183 volumeSpec = worker.VolumeSpec{ 184 Strategy: baggageclaim.EmptyStrategy{}, 185 } 186 187 workerSpec = worker.WorkerSpec{ 188 TeamID: 1, 189 } 190 191 volumeType = db.VolumeTypeArtifact 192 }) 193 194 JustBeforeEach(func() { 195 _, err = client.CreateVolume(logger, volumeSpec, workerSpec, volumeType) 196 }) 197 198 Context("when no workers can be found", func() { 199 BeforeEach(func() { 200 fakePool.FindOrChooseWorkerReturns(nil, errors.New("nope")) 201 }) 202 203 It("returns an error", func() { 204 Expect(err).To(HaveOccurred()) 205 }) 206 }) 207 208 Context("when the worker can be found", func() { 209 BeforeEach(func() { 210 fakeWorker = new(workerfakes.FakeWorker) 211 fakePool.FindOrChooseWorkerReturns(fakeWorker, nil) 212 }) 213 214 It("creates the volume on the worker", func() { 215 Expect(err).ToNot(HaveOccurred()) 216 Expect(fakeWorker.CreateVolumeCallCount()).To(Equal(1)) 217 l, spec, id, t := fakeWorker.CreateVolumeArgsForCall(0) 218 Expect(l).To(Equal(logger)) 219 Expect(spec).To(Equal(volumeSpec)) 220 Expect(id).To(Equal(1)) 221 Expect(t).To(Equal(volumeType)) 222 }) 223 }) 224 }) 225 226 Describe("RunCheckStep", func() { 227 228 var ( 229 result worker.CheckResult 230 err, expectedErr error 231 fakeResource *resourcefakes.FakeResource 232 fakeDelegate *execfakes.FakeBuildStepDelegate 233 ) 234 235 BeforeEach(func() { 236 fakeResource = new(resourcefakes.FakeResource) 237 fakeDelegate = new(execfakes.FakeBuildStepDelegate) 238 }) 239 240 JustBeforeEach(func() { 241 owner := new(dbfakes.FakeContainerOwner) 242 containerSpec := worker.ContainerSpec{} 243 fakeStrategy := new(workerfakes.FakeContainerPlacementStrategy) 244 workerSpec := worker.WorkerSpec{} 245 fakeResourceTypes := atc.VersionedResourceTypes{} 246 247 imageSpec := worker.ImageFetcherSpec{ 248 Delegate: fakeDelegate, 249 ResourceTypes: fakeResourceTypes, 250 } 251 252 result, err = client.RunCheckStep( 253 context.Background(), 254 logger, 255 owner, 256 containerSpec, 257 workerSpec, 258 fakeStrategy, 259 metadata, 260 imageSpec, 261 1*time.Nanosecond, 262 fakeResource, 263 ) 264 }) 265 266 Context("faling to find worker for container", func() { 267 BeforeEach(func() { 268 expectedErr = errors.New("find-worker-err") 269 270 fakePool.FindOrChooseWorkerForContainerReturns(nil, expectedErr) 271 }) 272 273 It("errors", func() { 274 Expect(err).To(HaveOccurred()) 275 Expect(errors.Is(err, expectedErr)).To(BeTrue()) 276 }) 277 }) 278 279 Context("having found a worker", func() { 280 var fakeWorker *workerfakes.FakeWorker 281 282 BeforeEach(func() { 283 fakeWorker = new(workerfakes.FakeWorker) 284 fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil) 285 }) 286 287 Context("failing to find or create container in the worker", func() { 288 BeforeEach(func() { 289 expectedErr = errors.New("find-or-create-container-err") 290 fakeWorker.FindOrCreateContainerReturns(nil, expectedErr) 291 }) 292 293 It("errors", func() { 294 Expect(errors.Is(err, expectedErr)).To(BeTrue()) 295 }) 296 }) 297 298 Context("having found a container", func() { 299 var fakeContainer *workerfakes.FakeContainer 300 301 BeforeEach(func() { 302 fakeContainer = new(workerfakes.FakeContainer) 303 fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil) 304 }) 305 306 Context("check failing", func() { 307 BeforeEach(func() { 308 expectedErr = errors.New("check-err") 309 fakeResource.CheckReturns(nil, expectedErr) 310 }) 311 312 It("errors", func() { 313 Expect(errors.Is(err, expectedErr)).To(BeTrue()) 314 }) 315 }) 316 317 It("runs check w/ timeout", func() { 318 ctx, _, _ := fakeResource.CheckArgsForCall(0) 319 _, hasDeadline := ctx.Deadline() 320 321 Expect(hasDeadline).To(BeTrue()) 322 }) 323 324 It("uses the right executable path in the proc spec", func() { 325 _, processSpec, _ := fakeResource.CheckArgsForCall(0) 326 327 Expect(processSpec).To(Equal(runtime.ProcessSpec{ 328 Path: "/opt/resource/check", 329 })) 330 }) 331 332 It("uses the container as the runner", func() { 333 _, _, container := fakeResource.CheckArgsForCall(0) 334 335 Expect(container).To(Equal(fakeContainer)) 336 }) 337 338 Context("succeeding", func() { 339 BeforeEach(func() { 340 fakeResource.CheckReturns([]atc.Version{ 341 {"version": "1"}, 342 }, nil) 343 }) 344 345 It("returns the versions", func() { 346 Expect(result.Versions).To(HaveLen(1)) 347 Expect(result.Versions[0]).To(Equal(atc.Version{"version": "1"})) 348 }) 349 }) 350 }) 351 }) 352 353 }) 354 355 Describe("RunGetStep", func() { 356 357 var ( 358 ctx context.Context 359 owner db.ContainerOwner 360 containerSpec worker.ContainerSpec 361 workerSpec worker.WorkerSpec 362 metadata db.ContainerMetadata 363 imageSpec worker.ImageFetcherSpec 364 fakeChosenWorker *workerfakes.FakeWorker 365 fakeStrategy *workerfakes.FakeContainerPlacementStrategy 366 fakeDelegate *workerfakes.FakeImageFetchingDelegate 367 fakeEventDelegate *runtimefakes.FakeStartingEventDelegate 368 fakeResourceTypes atc.VersionedResourceTypes 369 fakeContainer *workerfakes.FakeContainer 370 fakeProcessSpec runtime.ProcessSpec 371 fakeResource *resourcefakes.FakeResource 372 fakeUsedResourceCache *dbfakes.FakeUsedResourceCache 373 374 err error 375 376 disasterErr error 377 378 result worker.GetResult 379 ) 380 381 BeforeEach(func() { 382 ctx, _ = context.WithCancel(context.Background()) 383 owner = new(dbfakes.FakeContainerOwner) 384 containerSpec = worker.ContainerSpec{} 385 fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy) 386 workerSpec = worker.WorkerSpec{} 387 fakeChosenWorker = new(workerfakes.FakeWorker) 388 fakeDelegate = new(workerfakes.FakeImageFetchingDelegate) 389 fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate) 390 fakeResourceTypes = atc.VersionedResourceTypes{} 391 imageSpec = worker.ImageFetcherSpec{ 392 Delegate: fakeDelegate, 393 ResourceTypes: fakeResourceTypes, 394 } 395 396 fakeResource = new(resourcefakes.FakeResource) 397 fakeContainer = new(workerfakes.FakeContainer) 398 disasterErr = errors.New("oh no") 399 stdout := new(gbytes.Buffer) 400 stderr := new(gbytes.Buffer) 401 fakeProcessSpec = runtime.ProcessSpec{ 402 Path: "/opt/resource/out", 403 StdoutWriter: stdout, 404 StderrWriter: stderr, 405 } 406 fakeUsedResourceCache = new(dbfakes.FakeUsedResourceCache) 407 408 fakeChosenWorker = new(workerfakes.FakeWorker) 409 fakeChosenWorker.NameReturns("some-worker") 410 fakeChosenWorker.SatisfiesReturns(true) 411 fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil) 412 fakePool.FindOrChooseWorkerForContainerReturns(fakeChosenWorker, nil) 413 414 }) 415 416 JustBeforeEach(func() { 417 result, err = client.RunGetStep( 418 ctx, 419 logger, 420 owner, 421 containerSpec, 422 workerSpec, 423 fakeStrategy, 424 metadata, 425 imageSpec, 426 fakeProcessSpec, 427 fakeEventDelegate, 428 fakeUsedResourceCache, 429 fakeResource, 430 ) 431 }) 432 433 It("finds/chooses a worker", func() { 434 Expect(err).ToNot(HaveOccurred()) 435 436 Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1)) 437 438 _, _, actualOwner, actualContainerSpec, actualWorkerSpec, actualStrategy := fakePool.FindOrChooseWorkerForContainerArgsForCall(0) 439 Expect(actualOwner).To(Equal(owner)) 440 Expect(actualContainerSpec).To(Equal(containerSpec)) 441 Expect(actualWorkerSpec).To(Equal(workerSpec)) 442 Expect(actualStrategy).To(Equal(fakeStrategy)) 443 }) 444 445 It("invokes the SelectedWorker Event on the delegate", func() { 446 Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1))) 447 }) 448 449 Context("worker is chosen", func() { 450 BeforeEach(func() { 451 fakePool.FindOrChooseWorkerReturns(fakeChosenWorker, nil) 452 }) 453 454 It("invokes the Starting Event on the delegate", func() { 455 Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1))) 456 _, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0) 457 Expect(actualWorkerName).To(Equal(fakeChosenWorker.Name())) 458 }) 459 460 It("calls Fetch on the worker", func() { 461 Expect(fakeChosenWorker.FetchCallCount()).To(Equal(1)) 462 _, _, actualMetadata, actualChosenWorker, actualContainerSpec, actualProcessSpec, actualResource, actualOwner, actualImageFetcherSpec, actualResourceCache, actualLockName := fakeChosenWorker.FetchArgsForCall(0) 463 464 Expect(actualMetadata).To(Equal(metadata)) 465 Expect(actualChosenWorker).To(Equal(fakeChosenWorker)) 466 Expect(actualContainerSpec).To(Equal(containerSpec)) 467 Expect(actualProcessSpec).To(Equal(fakeProcessSpec)) 468 Expect(actualResource).To(Equal(fakeResource)) 469 Expect(actualOwner).To(Equal(owner)) 470 Expect(actualImageFetcherSpec).To(Equal(imageSpec)) 471 Expect(actualResourceCache).To(Equal(fakeUsedResourceCache)) 472 // Computed SHA 473 Expect(actualLockName).To(Equal("18c3de3f8ea112ba52e01f279b6cc62335b4bec2f359b9be7636a5ad7bf98f8c")) 474 }) 475 }) 476 477 Context("Worker selection returns an error", func() { 478 BeforeEach(func() { 479 fakePool.FindOrChooseWorkerForContainerReturns(nil, disasterErr) 480 }) 481 482 It("Returns the error", func() { 483 Expect(err).To(HaveOccurred()) 484 Expect(err).To(Equal(disasterErr)) 485 486 Expect(result).To(Equal(worker.GetResult{})) 487 }) 488 }) 489 490 Context("Calling chosenWorker.Fetch", func() { 491 var ( 492 someError error 493 someGetResult worker.GetResult 494 fakeVolume *workerfakes.FakeVolume 495 ) 496 BeforeEach(func() { 497 someGetResult = worker.GetResult{ 498 ExitStatus: 0, 499 VersionResult: runtime.VersionResult{ 500 atc.Version{"some-version": "some-value"}, 501 []atc.MetadataField{{"foo", "bar"}}, 502 }, 503 } 504 someError = errors.New("some-foo-error") 505 fakeVolume = new(workerfakes.FakeVolume) 506 fakeChosenWorker.FetchReturns(someGetResult, fakeVolume, someError) 507 }) 508 It("returns getResult & err", func() { 509 Expect(result).To(Equal(someGetResult)) 510 Expect(err).To(Equal(someError)) 511 }) 512 }) 513 }) 514 515 Describe("RunTaskStep", func() { 516 var ( 517 status int 518 volumeMounts []worker.VolumeMount 519 inputSources []worker.InputSource 520 taskResult worker.TaskResult 521 err error 522 523 fakeWorker *workerfakes.FakeWorker 524 fakeContainerOwner db.ContainerOwner 525 fakeWorkerSpec worker.WorkerSpec 526 fakeContainerSpec worker.ContainerSpec 527 fakeStrategy *workerfakes.FakeContainerPlacementStrategy 528 fakeMetadata db.ContainerMetadata 529 fakeDelegate *execfakes.FakeTaskDelegate 530 fakeImageFetcherSpec worker.ImageFetcherSpec 531 fakeTaskProcessSpec runtime.ProcessSpec 532 fakeContainer *workerfakes.FakeContainer 533 fakeEventDelegate *runtimefakes.FakeStartingEventDelegate 534 535 ctx context.Context 536 cancel func() 537 ) 538 539 BeforeEach(func() { 540 cpu := uint64(1024) 541 memory := uint64(1024) 542 543 buildId := 1234 544 planId := atc.PlanID("42") 545 teamId := 123 546 fakeDelegate = new(execfakes.FakeTaskDelegate) 547 fakeContainerOwner = db.NewBuildStepContainerOwner( 548 buildId, 549 planId, 550 teamId, 551 ) 552 fakeWorkerSpec = worker.WorkerSpec{} 553 fakeContainerSpec = worker.ContainerSpec{ 554 Platform: "some-platform", 555 Tags: []string{"step", "tags"}, 556 TeamID: 123, 557 ImageSpec: worker.ImageSpec{ 558 ImageResource: &worker.ImageResource{ 559 Type: "docker", 560 Source: atc.Source{"some": "secret-source-param"}, 561 Params: atc.Params{"some": "params"}, 562 Version: atc.Version{"some": "version"}, 563 }, 564 Privileged: false, 565 }, 566 Limits: worker.ContainerLimits{ 567 CPU: &cpu, 568 Memory: &memory, 569 }, 570 Dir: "some-artifact-root", 571 Env: []string{"SECURE=secret-task-param"}, 572 ArtifactByPath: map[string]runtime.Artifact{}, 573 Inputs: inputSources, 574 Outputs: worker.OutputPaths{}, 575 } 576 fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy) 577 fakeMetadata = db.ContainerMetadata{ 578 WorkingDirectory: "some-artifact-root", 579 Type: db.ContainerTypeTask, 580 StepName: "some-step", 581 } 582 fakeImageFetcherSpec = worker.ImageFetcherSpec{ 583 Delegate: fakeDelegate, 584 ResourceTypes: atc.VersionedResourceTypes{}, 585 } 586 fakeTaskProcessSpec = runtime.ProcessSpec{ 587 Path: "/some/path", 588 Args: []string{"some", "args"}, 589 Dir: "/some/dir", 590 StdoutWriter: new(bytes.Buffer), 591 StderrWriter: new(bytes.Buffer), 592 } 593 fakeContainer = new(workerfakes.FakeContainer) 594 fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil) 595 596 fakeWorker = new(workerfakes.FakeWorker) 597 fakeWorker.NameReturns("some-worker") 598 fakeWorker.SatisfiesReturns(true) 599 fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil) 600 601 fakeWorker.IncreaseActiveTasksStub = func() error { 602 fakeWorker.ActiveTasksReturns(1, nil) 603 return nil 604 } 605 606 fakeWorker.DecreaseActiveTasksStub = func() error { 607 fakeWorker.ActiveTasksReturns(0, nil) 608 return nil 609 } 610 611 fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate) 612 613 fakeLockFactory = new(lockfakes.FakeLockFactory) 614 fakeLock = new(lockfakes.FakeLock) 615 fakeLockFactory.AcquireReturns(fakeLock, true, nil) 616 617 fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil) 618 ctx, cancel = context.WithCancel(context.Background()) 619 }) 620 621 JustBeforeEach(func() { 622 taskResult, err = client.RunTaskStep( 623 ctx, 624 logger, 625 fakeContainerOwner, 626 fakeContainerSpec, 627 fakeWorkerSpec, 628 fakeStrategy, 629 fakeMetadata, 630 fakeImageFetcherSpec, 631 fakeTaskProcessSpec, 632 fakeEventDelegate, 633 fakeLockFactory, 634 ) 635 status = taskResult.ExitStatus 636 volumeMounts = taskResult.VolumeMounts 637 }) 638 639 Context("choosing a worker", func() { 640 BeforeEach(func() { 641 // later fakes are uninitialized 642 fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "3"}, nil) 643 }) 644 645 It("chooses a worker", func() { 646 Expect(err).ToNot(HaveOccurred()) 647 Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1)) 648 _, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0) 649 Expect(actualWorkerName).To(Equal(fakeWorker.Name())) 650 }) 651 652 It("invokes the SelectedWorker Event on the delegate", func() { 653 Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1))) 654 }) 655 656 Context("when 'limit-active-tasks' strategy is chosen", func() { 657 BeforeEach(func() { 658 fakeStrategy.ModifiesActiveTasksReturns(true) 659 }) 660 Context("when a worker is found", func() { 661 BeforeEach(func() { 662 fakeWorker.NameReturns("some-worker") 663 fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil) 664 665 fakeContainer := new(workerfakes.FakeContainer) 666 fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil) 667 fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil) 668 }) 669 It("increase the active tasks on the worker", func() { 670 Expect(fakeWorker.IncreaseActiveTasksCallCount()).To(Equal(1)) 671 }) 672 673 Context("when the container is already present on the worker", func() { 674 BeforeEach(func() { 675 fakePool.ContainerInWorkerReturns(true, nil) 676 }) 677 It("does not increase the active tasks on the worker", func() { 678 Expect(fakeWorker.IncreaseActiveTasksCallCount()).To(Equal(0)) 679 }) 680 }) 681 }) 682 683 Context("when the task is aborted waiting for an available worker", func() { 684 BeforeEach(func() { 685 cancel() 686 }) 687 It("exits releasing the lock", func() { 688 Expect(err.Error()).To(ContainSubstring(context.Canceled.Error())) 689 Expect(fakeLock.ReleaseCallCount()).To(Equal(fakeLockFactory.AcquireCallCount())) 690 }) 691 }) 692 693 Context("when a container in worker returns an error", func() { 694 BeforeEach(func() { 695 fakePool.ContainerInWorkerReturns(false, errors.New("nope")) 696 }) 697 It("release the task-step lock every time it acquires it", func() { 698 Expect(fakeLock.ReleaseCallCount()).To(Equal(fakeLockFactory.AcquireCallCount())) 699 }) 700 }) 701 }) 702 703 Context("when finding or choosing the worker errors", func() { 704 workerDisaster := errors.New("worker selection errored") 705 706 BeforeEach(func() { 707 fakePool.FindOrChooseWorkerForContainerReturns(nil, workerDisaster) 708 }) 709 710 It("returns the error", func() { 711 Expect(err).To(Equal(workerDisaster)) 712 }) 713 714 It("should not invokes the SelectedWorker Event on the delegate", func() { 715 Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(0))) 716 }) 717 }) 718 719 }) 720 721 It("finds or creates a container", func() { 722 Expect(fakeWorker.FindOrCreateContainerCallCount()).To(Equal(1)) 723 _, cancel, delegate, owner, createdMetadata, containerSpec, _ := fakeWorker.FindOrCreateContainerArgsForCall(0) 724 Expect(containerSpec.Inputs).To(Equal(fakeContainerSpec.Inputs)) 725 Expect(containerSpec).To(Equal(fakeContainerSpec)) 726 Expect(cancel).ToNot(BeNil()) 727 Expect(owner).To(Equal(fakeContainerOwner)) 728 Expect(delegate).To(Equal(fakeDelegate)) 729 Expect(createdMetadata).To(Equal(db.ContainerMetadata{ 730 WorkingDirectory: "some-artifact-root", 731 Type: db.ContainerTypeTask, 732 StepName: "some-step", 733 })) 734 }) 735 736 Context("found a container that has already exited", func() { 737 BeforeEach(func() { 738 fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "8"}, nil) 739 }) 740 741 It("does not attach to any process", func() { 742 Expect(fakeContainer.AttachCallCount()).To(BeZero()) 743 }) 744 745 It("returns result of container process", func() { 746 Expect(err).ToNot(HaveOccurred()) 747 Expect(status).To(Equal(8)) 748 }) 749 750 Context("when 'limit-active-tasks' strategy is chosen", func() { 751 BeforeEach(func() { 752 fakeStrategy.ModifiesActiveTasksReturns(true) 753 }) 754 755 It("decrements the active tasks counter on the worker", func() { 756 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 757 }) 758 }) 759 760 Context("when volumes are configured and present on the container", func() { 761 var ( 762 fakeMountPath1 = "some-artifact-root/some-output-configured-path/" 763 fakeMountPath2 = "some-artifact-root/some-other-output/" 764 fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/" 765 766 fakeVolume1 *workerfakes.FakeVolume 767 fakeVolume2 *workerfakes.FakeVolume 768 fakeVolume3 *workerfakes.FakeVolume 769 ) 770 771 BeforeEach(func() { 772 fakeVolume1 = new(workerfakes.FakeVolume) 773 fakeVolume1.HandleReturns("some-handle-1") 774 fakeVolume2 = new(workerfakes.FakeVolume) 775 fakeVolume2.HandleReturns("some-handle-2") 776 fakeVolume3 = new(workerfakes.FakeVolume) 777 fakeVolume3.HandleReturns("some-handle-3") 778 779 fakeContainer.VolumeMountsReturns([]worker.VolumeMount{ 780 { 781 Volume: fakeVolume1, 782 MountPath: fakeMountPath1, 783 }, 784 { 785 Volume: fakeVolume2, 786 MountPath: fakeMountPath2, 787 }, 788 { 789 Volume: fakeVolume3, 790 MountPath: fakeMountPath3, 791 }, 792 }) 793 }) 794 795 It("returns all the volume mounts", func() { 796 Expect(volumeMounts).To(ConsistOf( 797 worker.VolumeMount{ 798 Volume: fakeVolume1, 799 MountPath: fakeMountPath1, 800 }, 801 worker.VolumeMount{ 802 Volume: fakeVolume2, 803 MountPath: fakeMountPath2, 804 }, 805 worker.VolumeMount{ 806 Volume: fakeVolume3, 807 MountPath: fakeMountPath3, 808 }, 809 )) 810 }) 811 }) 812 }) 813 814 Context("container has not already exited", func() { 815 var ( 816 fakeProcess *gardenfakes.FakeProcess 817 fakeProcessExitCode int 818 819 fakeMountPath1 = "some-artifact-root/some-output-configured-path/" 820 fakeMountPath2 = "some-artifact-root/some-other-output/" 821 fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/" 822 823 fakeVolume1 *workerfakes.FakeVolume 824 fakeVolume2 *workerfakes.FakeVolume 825 fakeVolume3 *workerfakes.FakeVolume 826 827 stdoutBuf *gbytes.Buffer 828 stderrBuf *gbytes.Buffer 829 ) 830 831 BeforeEach(func() { 832 fakeProcess = new(gardenfakes.FakeProcess) 833 fakeContainer.PropertiesReturns(garden.Properties{}, nil) 834 835 // for testing volume mounts being returned 836 fakeVolume1 = new(workerfakes.FakeVolume) 837 fakeVolume1.HandleReturns("some-handle-1") 838 fakeVolume2 = new(workerfakes.FakeVolume) 839 fakeVolume2.HandleReturns("some-handle-2") 840 fakeVolume3 = new(workerfakes.FakeVolume) 841 fakeVolume3.HandleReturns("some-handle-3") 842 843 fakeContainer.VolumeMountsReturns([]worker.VolumeMount{ 844 { 845 Volume: fakeVolume1, 846 MountPath: fakeMountPath1, 847 }, 848 { 849 Volume: fakeVolume2, 850 MountPath: fakeMountPath2, 851 }, 852 { 853 Volume: fakeVolume3, 854 MountPath: fakeMountPath3, 855 }, 856 }) 857 }) 858 859 Context("found container that is already running", func() { 860 BeforeEach(func() { 861 fakeContainer.AttachReturns(fakeProcess, nil) 862 863 stdoutBuf = new(gbytes.Buffer) 864 stderrBuf = new(gbytes.Buffer) 865 fakeTaskProcessSpec = runtime.ProcessSpec{ 866 StdoutWriter: stdoutBuf, 867 StderrWriter: stderrBuf, 868 } 869 }) 870 871 It("does not create a new container", func() { 872 Expect(fakeContainer.RunCallCount()).To(BeZero()) 873 }) 874 875 It("attaches to the running process", func() { 876 Expect(err).ToNot(HaveOccurred()) 877 Expect(fakeContainer.AttachCallCount()).To(Equal(1)) 878 Expect(fakeContainer.RunCallCount()).To(Equal(0)) 879 _, _, actualProcessIO := fakeContainer.AttachArgsForCall(0) 880 Expect(actualProcessIO.Stdout).To(Equal(stdoutBuf)) 881 Expect(actualProcessIO.Stderr).To(Equal(stderrBuf)) 882 }) 883 884 Context("when the process is interrupted", func() { 885 var stopped chan struct{} 886 BeforeEach(func() { 887 stopped = make(chan struct{}) 888 889 fakeProcess.WaitStub = func() (int, error) { 890 defer GinkgoRecover() 891 892 <-stopped 893 return 128 + 15, nil 894 } 895 896 fakeContainer.StopStub = func(bool) error { 897 close(stopped) 898 return nil 899 } 900 901 cancel() 902 }) 903 904 It("stops the container", func() { 905 Expect(fakeContainer.StopCallCount()).To(Equal(1)) 906 Expect(fakeContainer.StopArgsForCall(0)).To(BeFalse()) 907 Expect(err).To(Equal(context.Canceled)) 908 }) 909 910 Context("when container.stop returns an error", func() { 911 var disaster error 912 913 BeforeEach(func() { 914 disaster = errors.New("gotta get away") 915 916 fakeContainer.StopStub = func(bool) error { 917 close(stopped) 918 return disaster 919 } 920 }) 921 922 It("doesn't return the error", func() { 923 Expect(err).To(Equal(context.Canceled)) 924 }) 925 }) 926 927 Context("when 'limit-active-tasks' strategy is chosen", func() { 928 BeforeEach(func() { 929 fakeStrategy.ModifiesActiveTasksReturns(true) 930 }) 931 932 It("decrements the active tasks counter on the worker", func() { 933 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 934 }) 935 }) 936 }) 937 938 Context("when the process exits successfully", func() { 939 BeforeEach(func() { 940 fakeProcessExitCode = 0 941 fakeProcess.WaitReturns(fakeProcessExitCode, nil) 942 }) 943 It("returns a successful result", func() { 944 Expect(status).To(BeZero()) 945 Expect(err).ToNot(HaveOccurred()) 946 }) 947 948 It("returns all the volume mounts", func() { 949 Expect(volumeMounts).To(ConsistOf( 950 worker.VolumeMount{ 951 Volume: fakeVolume1, 952 MountPath: fakeMountPath1, 953 }, 954 worker.VolumeMount{ 955 Volume: fakeVolume2, 956 MountPath: fakeMountPath2, 957 }, 958 worker.VolumeMount{ 959 Volume: fakeVolume3, 960 MountPath: fakeMountPath3, 961 }, 962 )) 963 }) 964 965 Context("when 'limit-active-tasks' strategy is chosen", func() { 966 BeforeEach(func() { 967 fakeStrategy.ModifiesActiveTasksReturns(true) 968 }) 969 970 It("decrements the active tasks counter on the worker", func() { 971 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 972 }) 973 }) 974 }) 975 976 Context("when the process exits with an error", func() { 977 disaster := errors.New("process failed") 978 BeforeEach(func() { 979 fakeProcessExitCode = 128 + 15 980 fakeProcess.WaitReturns(fakeProcessExitCode, disaster) 981 }) 982 It("returns an unsuccessful result", func() { 983 Expect(status).To(Equal(fakeProcessExitCode)) 984 Expect(err).To(HaveOccurred()) 985 Expect(err).To(Equal(disaster)) 986 }) 987 988 It("returns no volume mounts", func() { 989 Expect(volumeMounts).To(BeEmpty()) 990 }) 991 992 Context("when 'limit-active-tasks' strategy is chosen", func() { 993 BeforeEach(func() { 994 fakeStrategy.ModifiesActiveTasksReturns(true) 995 }) 996 997 It("decrements the active tasks counter on the worker", func() { 998 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 999 }) 1000 }) 1001 }) 1002 }) 1003 1004 Context("created a new container", func() { 1005 BeforeEach(func() { 1006 fakeContainer.AttachReturns(nil, errors.New("container not running")) 1007 fakeContainer.RunReturns(fakeProcess, nil) 1008 1009 stdoutBuf = new(gbytes.Buffer) 1010 stderrBuf = new(gbytes.Buffer) 1011 fakeTaskProcessSpec = runtime.ProcessSpec{ 1012 StdoutWriter: stdoutBuf, 1013 StderrWriter: stderrBuf, 1014 } 1015 }) 1016 1017 It("runs a new process in the container", func() { 1018 Eventually(fakeContainer.RunCallCount()).Should(Equal(1)) 1019 1020 _, gardenProcessSpec, actualProcessIO := fakeContainer.RunArgsForCall(0) 1021 Expect(gardenProcessSpec.ID).To(Equal("task")) 1022 Expect(gardenProcessSpec.Path).To(Equal(fakeTaskProcessSpec.Path)) 1023 Expect(gardenProcessSpec.Args).To(ConsistOf(fakeTaskProcessSpec.Args)) 1024 Expect(gardenProcessSpec.Dir).To(Equal(path.Join(fakeMetadata.WorkingDirectory, fakeTaskProcessSpec.Dir))) 1025 Expect(gardenProcessSpec.TTY).To(Equal(&garden.TTYSpec{WindowSize: &garden.WindowSize{Columns: 500, Rows: 500}})) 1026 Expect(actualProcessIO.Stdout).To(Equal(stdoutBuf)) 1027 Expect(actualProcessIO.Stderr).To(Equal(stderrBuf)) 1028 }) 1029 1030 It("invokes the Starting Event on the delegate", func() { 1031 Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1))) 1032 }) 1033 1034 Context("when the process is interrupted", func() { 1035 var stopped chan struct{} 1036 BeforeEach(func() { 1037 stopped = make(chan struct{}) 1038 1039 fakeProcess.WaitStub = func() (int, error) { 1040 defer GinkgoRecover() 1041 1042 <-stopped 1043 return 128 + 15, nil // wat? 1044 } 1045 1046 fakeContainer.StopStub = func(bool) error { 1047 close(stopped) 1048 return nil 1049 } 1050 1051 cancel() 1052 }) 1053 1054 It("stops the container", func() { 1055 Expect(fakeContainer.StopCallCount()).To(Equal(1)) 1056 Expect(fakeContainer.StopArgsForCall(0)).To(BeFalse()) 1057 Expect(err).To(Equal(context.Canceled)) 1058 }) 1059 1060 Context("when container.stop returns an error", func() { 1061 var disaster error 1062 1063 BeforeEach(func() { 1064 disaster = errors.New("gotta get away") 1065 1066 fakeContainer.StopStub = func(bool) error { 1067 close(stopped) 1068 return disaster 1069 } 1070 }) 1071 1072 It("doesn't return the error", func() { 1073 Expect(err).To(Equal(context.Canceled)) 1074 }) 1075 }) 1076 }) 1077 1078 Context("when the process exits successfully", func() { 1079 It("returns a successful result", func() { 1080 Expect(status).To(BeZero()) 1081 Expect(err).ToNot(HaveOccurred()) 1082 }) 1083 1084 It("saves the exit status property", func() { 1085 Expect(fakeContainer.SetPropertyCallCount()).To(Equal(1)) 1086 1087 name, value := fakeContainer.SetPropertyArgsForCall(0) 1088 Expect(name).To(Equal("concourse:exit-status")) 1089 Expect(value).To(Equal("0")) 1090 }) 1091 1092 Context("when saving the exit status succeeds", func() { 1093 BeforeEach(func() { 1094 fakeContainer.SetPropertyReturns(nil) 1095 }) 1096 1097 It("returns successfully", func() { 1098 Expect(err).ToNot(HaveOccurred()) 1099 }) 1100 }) 1101 1102 Context("when saving the exit status fails", func() { 1103 disaster := errors.New("nope") 1104 1105 BeforeEach(func() { 1106 fakeContainer.SetPropertyStub = func(name string, value string) error { 1107 defer GinkgoRecover() 1108 1109 if name == "concourse:exit-status" { 1110 return disaster 1111 } 1112 1113 return nil 1114 } 1115 }) 1116 1117 It("returns the error", func() { 1118 Expect(err).To(Equal(disaster)) 1119 }) 1120 }) 1121 1122 Context("when volumes are configured and present on the container", func() { 1123 var ( 1124 fakeMountPath1 = "some-artifact-root/some-output-configured-path/" 1125 fakeMountPath2 = "some-artifact-root/some-other-output/" 1126 fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/" 1127 1128 fakeVolume1 *workerfakes.FakeVolume 1129 fakeVolume2 *workerfakes.FakeVolume 1130 fakeVolume3 *workerfakes.FakeVolume 1131 ) 1132 1133 BeforeEach(func() { 1134 fakeVolume1 = new(workerfakes.FakeVolume) 1135 fakeVolume1.HandleReturns("some-handle-1") 1136 fakeVolume2 = new(workerfakes.FakeVolume) 1137 fakeVolume2.HandleReturns("some-handle-2") 1138 fakeVolume3 = new(workerfakes.FakeVolume) 1139 fakeVolume3.HandleReturns("some-handle-3") 1140 1141 fakeContainer.VolumeMountsReturns([]worker.VolumeMount{ 1142 { 1143 Volume: fakeVolume1, 1144 MountPath: fakeMountPath1, 1145 }, 1146 { 1147 Volume: fakeVolume2, 1148 MountPath: fakeMountPath2, 1149 }, 1150 { 1151 Volume: fakeVolume3, 1152 MountPath: fakeMountPath3, 1153 }, 1154 }) 1155 }) 1156 1157 It("returns all the volume mounts", func() { 1158 Expect(volumeMounts).To(ConsistOf( 1159 worker.VolumeMount{ 1160 Volume: fakeVolume1, 1161 MountPath: fakeMountPath1, 1162 }, 1163 worker.VolumeMount{ 1164 Volume: fakeVolume2, 1165 MountPath: fakeMountPath2, 1166 }, 1167 worker.VolumeMount{ 1168 Volume: fakeVolume3, 1169 MountPath: fakeMountPath3, 1170 }, 1171 )) 1172 }) 1173 1174 }) 1175 1176 Context("when 'limit-active-tasks' strategy is chosen", func() { 1177 BeforeEach(func() { 1178 fakeStrategy.ModifiesActiveTasksReturns(true) 1179 }) 1180 1181 It("decrements the active tasks counter on the worker", func() { 1182 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 1183 }) 1184 }) 1185 }) 1186 1187 Context("when the process exits on failure", func() { 1188 BeforeEach(func() { 1189 fakeProcessExitCode = 128 + 15 1190 fakeProcess.WaitReturns(fakeProcessExitCode, nil) 1191 }) 1192 It("returns an unsuccessful result", func() { 1193 Expect(status).To(Equal(fakeProcessExitCode)) 1194 Expect(err).ToNot(HaveOccurred()) 1195 }) 1196 1197 It("saves the exit status property", func() { 1198 Expect(fakeContainer.SetPropertyCallCount()).To(Equal(1)) 1199 1200 name, value := fakeContainer.SetPropertyArgsForCall(0) 1201 Expect(name).To(Equal("concourse:exit-status")) 1202 Expect(value).To(Equal(fmt.Sprint(fakeProcessExitCode))) 1203 }) 1204 1205 Context("when saving the exit status succeeds", func() { 1206 BeforeEach(func() { 1207 fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil) 1208 }) 1209 1210 It("returns successfully", func() { 1211 Expect(err).ToNot(HaveOccurred()) 1212 }) 1213 }) 1214 1215 Context("when saving the exit status fails", func() { 1216 disaster := errors.New("nope") 1217 1218 BeforeEach(func() { 1219 fakeContainer.SetPropertyStub = func(name string, value string) error { 1220 defer GinkgoRecover() 1221 1222 if name == "concourse:exit-status" { 1223 return disaster 1224 } 1225 1226 return nil 1227 } 1228 }) 1229 1230 It("returns the error", func() { 1231 Expect(err).To(Equal(disaster)) 1232 }) 1233 }) 1234 1235 It("returns all the volume mounts", func() { 1236 Expect(volumeMounts).To(ConsistOf( 1237 worker.VolumeMount{ 1238 Volume: fakeVolume1, 1239 MountPath: fakeMountPath1, 1240 }, 1241 worker.VolumeMount{ 1242 Volume: fakeVolume2, 1243 MountPath: fakeMountPath2, 1244 }, 1245 worker.VolumeMount{ 1246 Volume: fakeVolume3, 1247 MountPath: fakeMountPath3, 1248 }, 1249 )) 1250 }) 1251 1252 Context("when 'limit-active-tasks' strategy is chosen", func() { 1253 BeforeEach(func() { 1254 fakeStrategy.ModifiesActiveTasksReturns(true) 1255 }) 1256 1257 It("decrements the active tasks counter on the worker", func() { 1258 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 1259 }) 1260 }) 1261 }) 1262 1263 Context("when running the container fails with an error", func() { 1264 disaster := errors.New("nope") 1265 1266 BeforeEach(func() { 1267 fakeContainer.RunReturns(nil, disaster) 1268 }) 1269 1270 It("returns the error", func() { 1271 Expect(err).To(Equal(disaster)) 1272 }) 1273 1274 Context("when 'limit-active-tasks' strategy is chosen", func() { 1275 BeforeEach(func() { 1276 fakeStrategy.ModifiesActiveTasksReturns(true) 1277 }) 1278 1279 It("decrements the active tasks counter on the worker", func() { 1280 Expect(fakeWorker.ActiveTasks()).To(Equal(0)) 1281 }) 1282 }) 1283 }) 1284 }) 1285 }) 1286 }) 1287 1288 Describe("RunPutStep", func() { 1289 1290 var ( 1291 ctx context.Context 1292 owner db.ContainerOwner 1293 containerSpec worker.ContainerSpec 1294 workerSpec worker.WorkerSpec 1295 metadata db.ContainerMetadata 1296 imageSpec worker.ImageFetcherSpec 1297 fakeChosenWorker *workerfakes.FakeWorker 1298 fakeStrategy *workerfakes.FakeContainerPlacementStrategy 1299 fakeDelegate *workerfakes.FakeImageFetchingDelegate 1300 fakeEventDelegate *runtimefakes.FakeStartingEventDelegate 1301 fakeResourceTypes atc.VersionedResourceTypes 1302 fakeContainer *workerfakes.FakeContainer 1303 fakeProcessSpec runtime.ProcessSpec 1304 fakeResource *resourcefakes.FakeResource 1305 1306 versionResult runtime.VersionResult 1307 status int 1308 err error 1309 result worker.PutResult 1310 1311 disasterErr error 1312 ) 1313 1314 BeforeEach(func() { 1315 ctx = context.Background() 1316 owner = new(dbfakes.FakeContainerOwner) 1317 containerSpec = worker.ContainerSpec{} 1318 fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy) 1319 workerSpec = worker.WorkerSpec{} 1320 fakeChosenWorker = new(workerfakes.FakeWorker) 1321 fakeDelegate = new(workerfakes.FakeImageFetchingDelegate) 1322 fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate) 1323 fakeResourceTypes = atc.VersionedResourceTypes{} 1324 imageSpec = worker.ImageFetcherSpec{ 1325 Delegate: fakeDelegate, 1326 ResourceTypes: fakeResourceTypes, 1327 } 1328 1329 fakeContainer = new(workerfakes.FakeContainer) 1330 disasterErr = errors.New("oh no") 1331 stdout := new(gbytes.Buffer) 1332 stderr := new(gbytes.Buffer) 1333 fakeProcessSpec = runtime.ProcessSpec{ 1334 Path: "/opt/resource/out", 1335 StdoutWriter: stdout, 1336 StderrWriter: stderr, 1337 } 1338 fakeResource = new(resourcefakes.FakeResource) 1339 1340 fakeChosenWorker = new(workerfakes.FakeWorker) 1341 fakeChosenWorker.NameReturns("some-worker") 1342 fakeChosenWorker.SatisfiesReturns(true) 1343 fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil) 1344 fakePool.FindOrChooseWorkerForContainerReturns(fakeChosenWorker, nil) 1345 1346 }) 1347 1348 JustBeforeEach(func() { 1349 result, err = client.RunPutStep( 1350 ctx, 1351 logger, 1352 owner, 1353 containerSpec, 1354 workerSpec, 1355 fakeStrategy, 1356 metadata, 1357 imageSpec, 1358 fakeProcessSpec, 1359 fakeEventDelegate, 1360 fakeResource, 1361 ) 1362 versionResult = result.VersionResult 1363 status = result.ExitStatus 1364 }) 1365 1366 It("finds/chooses a worker", func() { 1367 Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1)) 1368 1369 _, _, actualOwner, actualContainerSpec, actualWorkerSpec, strategy := fakePool.FindOrChooseWorkerForContainerArgsForCall(0) 1370 Expect(actualOwner).To(Equal(owner)) 1371 Expect(actualContainerSpec).To(Equal(containerSpec)) 1372 Expect(actualWorkerSpec).To(Equal(workerSpec)) 1373 Expect(strategy).To(Equal(fakeStrategy)) 1374 }) 1375 1376 It("invokes the SelectedWorker Event on the delegate", func() { 1377 Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1))) 1378 _, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0) 1379 Expect(actualWorkerName).To(Equal(fakeChosenWorker.Name())) 1380 }) 1381 1382 Context("worker is chosen", func() { 1383 BeforeEach(func() { 1384 fakePool.FindOrChooseWorkerReturns(fakeChosenWorker, nil) 1385 }) 1386 It("finds or creates a put container on that worker", func() { 1387 Expect(fakeChosenWorker.FindOrCreateContainerCallCount()).To(Equal(1)) 1388 _, _, actualDelegate, actualOwner, actualMetadata, actualContainerSpec, actualResourceTypes := fakeChosenWorker.FindOrCreateContainerArgsForCall(0) 1389 1390 Expect(actualContainerSpec).To(Equal(containerSpec)) 1391 Expect(actualDelegate).To(Equal(fakeDelegate)) 1392 Expect(actualOwner).To(Equal(owner)) 1393 Expect(actualMetadata).To(Equal(metadata)) 1394 Expect(actualResourceTypes).To(Equal(fakeResourceTypes)) 1395 }) 1396 }) 1397 1398 Context("worker selection returns an error", func() { 1399 BeforeEach(func() { 1400 fakePool.FindOrChooseWorkerForContainerReturns(nil, disasterErr) 1401 }) 1402 1403 It("returns the error", func() { 1404 Expect(err).To(HaveOccurred()) 1405 Expect(err).To(Equal(disasterErr)) 1406 Expect(versionResult).To(Equal(runtime.VersionResult{})) 1407 }) 1408 }) 1409 1410 Context("found a container that has run resource.Put and exited", func() { 1411 BeforeEach(func() { 1412 fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil) 1413 fakeContainer.PropertyStub = func(prop string) (result string, err error) { 1414 if prop == "concourse:exit-status" { 1415 return "8", nil 1416 } 1417 return "", errors.New("unhandled property") 1418 } 1419 }) 1420 1421 It("does not invoke resource.Put", func() { 1422 Expect(fakeResource.PutCallCount()).To(Equal(0)) 1423 }) 1424 1425 It("returns result of container process", func() { 1426 Expect(err).ToNot(HaveOccurred()) 1427 Expect(status).To(Equal(8)) 1428 }) 1429 }) 1430 1431 Context("calling resource.Put", func() { 1432 BeforeEach(func() { 1433 fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil) 1434 fakeContainer.PropertyReturns("0", fmt.Errorf("property not found")) 1435 }) 1436 1437 It("invokes the Starting Event on the delegate", func() { 1438 Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1))) 1439 }) 1440 1441 It("calls resource.Put with the correct ctx, processSpec and container", func() { 1442 actualCtx, actualProcessSpec, actualContainer := fakeResource.PutArgsForCall(0) 1443 Expect(actualCtx).To(Equal(ctx)) 1444 Expect(actualProcessSpec).To(Equal(fakeProcessSpec)) 1445 Expect(actualContainer).To(Equal(fakeContainer)) 1446 }) 1447 1448 Context("when PUT returns an error", func() { 1449 1450 Context("when the error is ErrResourceScriptFailed", func() { 1451 var ( 1452 scriptFailErr runtime.ErrResourceScriptFailed 1453 ) 1454 BeforeEach(func() { 1455 scriptFailErr = runtime.ErrResourceScriptFailed{ 1456 ExitStatus: 10, 1457 } 1458 1459 fakeResource.PutReturns( 1460 runtime.VersionResult{}, 1461 scriptFailErr, 1462 ) 1463 }) 1464 1465 It("returns a PutResult with the exit status from ErrResourceScriptFailed", func() { 1466 Expect(status).To(Equal(10)) 1467 Expect(err).To(BeNil()) 1468 }) 1469 }) 1470 1471 Context("when the error is NOT ErrResourceScriptFailed", func() { 1472 BeforeEach(func() { 1473 fakeResource.PutReturns( 1474 runtime.VersionResult{}, 1475 disasterErr, 1476 ) 1477 }) 1478 1479 It("returns an error", func() { 1480 Expect(err).To(Equal(disasterErr)) 1481 }) 1482 1483 }) 1484 }) 1485 1486 Context("when PUT succeeds", func() { 1487 var expectedVersionResult runtime.VersionResult 1488 BeforeEach(func() { 1489 expectedVersionResult = runtime.VersionResult{ 1490 Version: atc.Version(map[string]string{"foo": "bar"}), 1491 Metadata: nil, 1492 } 1493 1494 fakeResource.PutReturns(expectedVersionResult, nil) 1495 }) 1496 It("returns the correct VersionResult and ExitStatus", func() { 1497 Expect(err).To(BeNil()) 1498 Expect(status).To(Equal(0)) 1499 Expect(versionResult).To(Equal(expectedVersionResult)) 1500 }) 1501 }) 1502 }) 1503 1504 Context("worker.FindOrCreateContainer errored", func() { 1505 BeforeEach(func() { 1506 fakeChosenWorker.FindOrCreateContainerReturns(nil, disasterErr) 1507 }) 1508 1509 It("returns the error immediately", func() { 1510 Expect(err).To(HaveOccurred()) 1511 Expect(err).To(Equal(disasterErr)) 1512 Expect(versionResult).To(Equal(runtime.VersionResult{})) 1513 }) 1514 }) 1515 }) 1516}) 1517