1package worker_test
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"path"
9	"time"
10
11	"code.cloudfoundry.org/garden"
12	"code.cloudfoundry.org/garden/gardenfakes"
13	"github.com/concourse/concourse/atc"
14	"github.com/concourse/concourse/atc/compression/compressionfakes"
15	"github.com/concourse/concourse/atc/db/dbfakes"
16	"github.com/concourse/concourse/atc/db/lock/lockfakes"
17	"github.com/concourse/concourse/atc/exec/execfakes"
18	"github.com/concourse/concourse/atc/resource/resourcefakes"
19	"github.com/concourse/concourse/atc/runtime"
20	"github.com/concourse/concourse/atc/runtime/runtimefakes"
21	"github.com/onsi/gomega/gbytes"
22
23	"code.cloudfoundry.org/lager/lagertest"
24	"github.com/concourse/baggageclaim"
25	"github.com/concourse/concourse/atc/db"
26	"github.com/concourse/concourse/atc/worker"
27	"github.com/concourse/concourse/atc/worker/workerfakes"
28
29	. "github.com/onsi/ginkgo"
30	. "github.com/onsi/gomega"
31)
32
33var _ = Describe("Client", func() {
34	var (
35		logger          *lagertest.TestLogger
36		fakePool        *workerfakes.FakePool
37		fakeProvider    *workerfakes.FakeWorkerProvider
38		client          worker.Client
39		fakeLock        *lockfakes.FakeLock
40		fakeLockFactory *lockfakes.FakeLockFactory
41		fakeCompression *compressionfakes.FakeCompression
42	)
43
44	BeforeEach(func() {
45		logger = lagertest.NewTestLogger("test")
46		fakePool = new(workerfakes.FakePool)
47		fakeProvider = new(workerfakes.FakeWorkerProvider)
48		fakeCompression = new(compressionfakes.FakeCompression)
49		workerPolling := 1 * time.Second
50		workerStatus := 2 * time.Second
51
52		client = worker.NewClient(fakePool, fakeProvider, fakeCompression, workerPolling, workerStatus)
53	})
54
55	Describe("FindContainer", func() {
56		var (
57			foundContainer worker.Container
58			found          bool
59			findErr        error
60		)
61
62		JustBeforeEach(func() {
63			foundContainer, found, findErr = client.FindContainer(
64				logger,
65				4567,
66				"some-handle",
67			)
68		})
69
70		Context("when looking up the worker errors", func() {
71			BeforeEach(func() {
72				fakeProvider.FindWorkerForContainerReturns(nil, false, errors.New("nope"))
73			})
74
75			It("errors", func() {
76				Expect(findErr).To(HaveOccurred())
77			})
78		})
79
80		Context("when worker is not found", func() {
81			BeforeEach(func() {
82				fakeProvider.FindWorkerForContainerReturns(nil, false, nil)
83			})
84
85			It("returns not found", func() {
86				Expect(findErr).NotTo(HaveOccurred())
87				Expect(found).To(BeFalse())
88			})
89		})
90
91		Context("when a worker is found with the container", func() {
92			var fakeWorker *workerfakes.FakeWorker
93			var fakeContainer *workerfakes.FakeContainer
94
95			BeforeEach(func() {
96				fakeWorker = new(workerfakes.FakeWorker)
97				fakeProvider.FindWorkerForContainerReturns(fakeWorker, true, nil)
98
99				fakeContainer = new(workerfakes.FakeContainer)
100				fakeWorker.FindContainerByHandleReturns(fakeContainer, true, nil)
101			})
102
103			It("succeeds", func() {
104				Expect(found).To(BeTrue())
105				Expect(findErr).NotTo(HaveOccurred())
106			})
107
108			It("returns the created container", func() {
109				Expect(foundContainer).To(Equal(fakeContainer))
110			})
111		})
112	})
113
114	Describe("FindVolume", func() {
115		var (
116			foundVolume worker.Volume
117			found       bool
118			findErr     error
119		)
120
121		JustBeforeEach(func() {
122			foundVolume, found, findErr = client.FindVolume(
123				logger,
124				4567,
125				"some-handle",
126			)
127		})
128
129		Context("when looking up the worker errors", func() {
130			BeforeEach(func() {
131				fakeProvider.FindWorkerForVolumeReturns(nil, false, errors.New("nope"))
132			})
133
134			It("errors", func() {
135				Expect(findErr).To(HaveOccurred())
136			})
137		})
138
139		Context("when worker is not found", func() {
140			BeforeEach(func() {
141				fakeProvider.FindWorkerForVolumeReturns(nil, false, nil)
142			})
143
144			It("returns not found", func() {
145				Expect(findErr).NotTo(HaveOccurred())
146				Expect(found).To(BeFalse())
147			})
148		})
149
150		Context("when a worker is found with the volume", func() {
151			var fakeWorker *workerfakes.FakeWorker
152			var fakeVolume *workerfakes.FakeVolume
153
154			BeforeEach(func() {
155				fakeWorker = new(workerfakes.FakeWorker)
156				fakeProvider.FindWorkerForVolumeReturns(fakeWorker, true, nil)
157
158				fakeVolume = new(workerfakes.FakeVolume)
159				fakeWorker.LookupVolumeReturns(fakeVolume, true, nil)
160			})
161
162			It("succeeds", func() {
163				Expect(found).To(BeTrue())
164				Expect(findErr).NotTo(HaveOccurred())
165			})
166
167			It("returns the volume", func() {
168				Expect(foundVolume).To(Equal(fakeVolume))
169			})
170		})
171	})
172
173	Describe("CreateVolume", func() {
174		var (
175			fakeWorker *workerfakes.FakeWorker
176			volumeSpec worker.VolumeSpec
177			workerSpec worker.WorkerSpec
178			volumeType db.VolumeType
179			err        error
180		)
181
182		BeforeEach(func() {
183			volumeSpec = worker.VolumeSpec{
184				Strategy: baggageclaim.EmptyStrategy{},
185			}
186
187			workerSpec = worker.WorkerSpec{
188				TeamID: 1,
189			}
190
191			volumeType = db.VolumeTypeArtifact
192		})
193
194		JustBeforeEach(func() {
195			_, err = client.CreateVolume(logger, volumeSpec, workerSpec, volumeType)
196		})
197
198		Context("when no workers can be found", func() {
199			BeforeEach(func() {
200				fakePool.FindOrChooseWorkerReturns(nil, errors.New("nope"))
201			})
202
203			It("returns an error", func() {
204				Expect(err).To(HaveOccurred())
205			})
206		})
207
208		Context("when the worker can be found", func() {
209			BeforeEach(func() {
210				fakeWorker = new(workerfakes.FakeWorker)
211				fakePool.FindOrChooseWorkerReturns(fakeWorker, nil)
212			})
213
214			It("creates the volume on the worker", func() {
215				Expect(err).ToNot(HaveOccurred())
216				Expect(fakeWorker.CreateVolumeCallCount()).To(Equal(1))
217				l, spec, id, t := fakeWorker.CreateVolumeArgsForCall(0)
218				Expect(l).To(Equal(logger))
219				Expect(spec).To(Equal(volumeSpec))
220				Expect(id).To(Equal(1))
221				Expect(t).To(Equal(volumeType))
222			})
223		})
224	})
225
226	Describe("RunCheckStep", func() {
227
228		var (
229			result           worker.CheckResult
230			err, expectedErr error
231			fakeResource     *resourcefakes.FakeResource
232			fakeDelegate     *execfakes.FakeBuildStepDelegate
233		)
234
235		BeforeEach(func() {
236			fakeResource = new(resourcefakes.FakeResource)
237			fakeDelegate = new(execfakes.FakeBuildStepDelegate)
238		})
239
240		JustBeforeEach(func() {
241			owner := new(dbfakes.FakeContainerOwner)
242			containerSpec := worker.ContainerSpec{}
243			fakeStrategy := new(workerfakes.FakeContainerPlacementStrategy)
244			workerSpec := worker.WorkerSpec{}
245			fakeResourceTypes := atc.VersionedResourceTypes{}
246
247			imageSpec := worker.ImageFetcherSpec{
248				Delegate:      fakeDelegate,
249				ResourceTypes: fakeResourceTypes,
250			}
251
252			result, err = client.RunCheckStep(
253				context.Background(),
254				logger,
255				owner,
256				containerSpec,
257				workerSpec,
258				fakeStrategy,
259				metadata,
260				imageSpec,
261				1*time.Nanosecond,
262				fakeResource,
263			)
264		})
265
266		Context("faling to find worker for container", func() {
267			BeforeEach(func() {
268				expectedErr = errors.New("find-worker-err")
269
270				fakePool.FindOrChooseWorkerForContainerReturns(nil, expectedErr)
271			})
272
273			It("errors", func() {
274				Expect(err).To(HaveOccurred())
275				Expect(errors.Is(err, expectedErr)).To(BeTrue())
276			})
277		})
278
279		Context("having found a worker", func() {
280			var fakeWorker *workerfakes.FakeWorker
281
282			BeforeEach(func() {
283				fakeWorker = new(workerfakes.FakeWorker)
284				fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil)
285			})
286
287			Context("failing to find or create container in the worker", func() {
288				BeforeEach(func() {
289					expectedErr = errors.New("find-or-create-container-err")
290					fakeWorker.FindOrCreateContainerReturns(nil, expectedErr)
291				})
292
293				It("errors", func() {
294					Expect(errors.Is(err, expectedErr)).To(BeTrue())
295				})
296			})
297
298			Context("having found a container", func() {
299				var fakeContainer *workerfakes.FakeContainer
300
301				BeforeEach(func() {
302					fakeContainer = new(workerfakes.FakeContainer)
303					fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil)
304				})
305
306				Context("check failing", func() {
307					BeforeEach(func() {
308						expectedErr = errors.New("check-err")
309						fakeResource.CheckReturns(nil, expectedErr)
310					})
311
312					It("errors", func() {
313						Expect(errors.Is(err, expectedErr)).To(BeTrue())
314					})
315				})
316
317				It("runs check w/ timeout", func() {
318					ctx, _, _ := fakeResource.CheckArgsForCall(0)
319					_, hasDeadline := ctx.Deadline()
320
321					Expect(hasDeadline).To(BeTrue())
322				})
323
324				It("uses the right executable path in the proc spec", func() {
325					_, processSpec, _ := fakeResource.CheckArgsForCall(0)
326
327					Expect(processSpec).To(Equal(runtime.ProcessSpec{
328						Path: "/opt/resource/check",
329					}))
330				})
331
332				It("uses the container as the runner", func() {
333					_, _, container := fakeResource.CheckArgsForCall(0)
334
335					Expect(container).To(Equal(fakeContainer))
336				})
337
338				Context("succeeding", func() {
339					BeforeEach(func() {
340						fakeResource.CheckReturns([]atc.Version{
341							{"version": "1"},
342						}, nil)
343					})
344
345					It("returns the versions", func() {
346						Expect(result.Versions).To(HaveLen(1))
347						Expect(result.Versions[0]).To(Equal(atc.Version{"version": "1"}))
348					})
349				})
350			})
351		})
352
353	})
354
355	Describe("RunGetStep", func() {
356
357		var (
358			ctx                   context.Context
359			owner                 db.ContainerOwner
360			containerSpec         worker.ContainerSpec
361			workerSpec            worker.WorkerSpec
362			metadata              db.ContainerMetadata
363			imageSpec             worker.ImageFetcherSpec
364			fakeChosenWorker      *workerfakes.FakeWorker
365			fakeStrategy          *workerfakes.FakeContainerPlacementStrategy
366			fakeDelegate          *workerfakes.FakeImageFetchingDelegate
367			fakeEventDelegate     *runtimefakes.FakeStartingEventDelegate
368			fakeResourceTypes     atc.VersionedResourceTypes
369			fakeContainer         *workerfakes.FakeContainer
370			fakeProcessSpec       runtime.ProcessSpec
371			fakeResource          *resourcefakes.FakeResource
372			fakeUsedResourceCache *dbfakes.FakeUsedResourceCache
373
374			err error
375
376			disasterErr error
377
378			result worker.GetResult
379		)
380
381		BeforeEach(func() {
382			ctx, _ = context.WithCancel(context.Background())
383			owner = new(dbfakes.FakeContainerOwner)
384			containerSpec = worker.ContainerSpec{}
385			fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy)
386			workerSpec = worker.WorkerSpec{}
387			fakeChosenWorker = new(workerfakes.FakeWorker)
388			fakeDelegate = new(workerfakes.FakeImageFetchingDelegate)
389			fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate)
390			fakeResourceTypes = atc.VersionedResourceTypes{}
391			imageSpec = worker.ImageFetcherSpec{
392				Delegate:      fakeDelegate,
393				ResourceTypes: fakeResourceTypes,
394			}
395
396			fakeResource = new(resourcefakes.FakeResource)
397			fakeContainer = new(workerfakes.FakeContainer)
398			disasterErr = errors.New("oh no")
399			stdout := new(gbytes.Buffer)
400			stderr := new(gbytes.Buffer)
401			fakeProcessSpec = runtime.ProcessSpec{
402				Path:         "/opt/resource/out",
403				StdoutWriter: stdout,
404				StderrWriter: stderr,
405			}
406			fakeUsedResourceCache = new(dbfakes.FakeUsedResourceCache)
407
408			fakeChosenWorker = new(workerfakes.FakeWorker)
409			fakeChosenWorker.NameReturns("some-worker")
410			fakeChosenWorker.SatisfiesReturns(true)
411			fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil)
412			fakePool.FindOrChooseWorkerForContainerReturns(fakeChosenWorker, nil)
413
414		})
415
416		JustBeforeEach(func() {
417			result, err = client.RunGetStep(
418				ctx,
419				logger,
420				owner,
421				containerSpec,
422				workerSpec,
423				fakeStrategy,
424				metadata,
425				imageSpec,
426				fakeProcessSpec,
427				fakeEventDelegate,
428				fakeUsedResourceCache,
429				fakeResource,
430			)
431		})
432
433		It("finds/chooses a worker", func() {
434			Expect(err).ToNot(HaveOccurred())
435
436			Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1))
437
438			_, _, actualOwner, actualContainerSpec, actualWorkerSpec, actualStrategy := fakePool.FindOrChooseWorkerForContainerArgsForCall(0)
439			Expect(actualOwner).To(Equal(owner))
440			Expect(actualContainerSpec).To(Equal(containerSpec))
441			Expect(actualWorkerSpec).To(Equal(workerSpec))
442			Expect(actualStrategy).To(Equal(fakeStrategy))
443		})
444
445		It("invokes the SelectedWorker Event on the delegate", func() {
446			Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1)))
447		})
448
449		Context("worker is chosen", func() {
450			BeforeEach(func() {
451				fakePool.FindOrChooseWorkerReturns(fakeChosenWorker, nil)
452			})
453
454			It("invokes the Starting Event on the delegate", func() {
455				Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1)))
456				_, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0)
457				Expect(actualWorkerName).To(Equal(fakeChosenWorker.Name()))
458			})
459
460			It("calls Fetch on the worker", func() {
461				Expect(fakeChosenWorker.FetchCallCount()).To(Equal(1))
462				_, _, actualMetadata, actualChosenWorker, actualContainerSpec, actualProcessSpec, actualResource, actualOwner, actualImageFetcherSpec, actualResourceCache, actualLockName := fakeChosenWorker.FetchArgsForCall(0)
463
464				Expect(actualMetadata).To(Equal(metadata))
465				Expect(actualChosenWorker).To(Equal(fakeChosenWorker))
466				Expect(actualContainerSpec).To(Equal(containerSpec))
467				Expect(actualProcessSpec).To(Equal(fakeProcessSpec))
468				Expect(actualResource).To(Equal(fakeResource))
469				Expect(actualOwner).To(Equal(owner))
470				Expect(actualImageFetcherSpec).To(Equal(imageSpec))
471				Expect(actualResourceCache).To(Equal(fakeUsedResourceCache))
472				// Computed SHA
473				Expect(actualLockName).To(Equal("18c3de3f8ea112ba52e01f279b6cc62335b4bec2f359b9be7636a5ad7bf98f8c"))
474			})
475		})
476
477		Context("Worker selection returns an error", func() {
478			BeforeEach(func() {
479				fakePool.FindOrChooseWorkerForContainerReturns(nil, disasterErr)
480			})
481
482			It("Returns the error", func() {
483				Expect(err).To(HaveOccurred())
484				Expect(err).To(Equal(disasterErr))
485
486				Expect(result).To(Equal(worker.GetResult{}))
487			})
488		})
489
490		Context("Calling chosenWorker.Fetch", func() {
491			var (
492				someError     error
493				someGetResult worker.GetResult
494				fakeVolume    *workerfakes.FakeVolume
495			)
496			BeforeEach(func() {
497				someGetResult = worker.GetResult{
498					ExitStatus: 0,
499					VersionResult: runtime.VersionResult{
500						atc.Version{"some-version": "some-value"},
501						[]atc.MetadataField{{"foo", "bar"}},
502					},
503				}
504				someError = errors.New("some-foo-error")
505				fakeVolume = new(workerfakes.FakeVolume)
506				fakeChosenWorker.FetchReturns(someGetResult, fakeVolume, someError)
507			})
508			It("returns getResult & err", func() {
509				Expect(result).To(Equal(someGetResult))
510				Expect(err).To(Equal(someError))
511			})
512		})
513	})
514
515	Describe("RunTaskStep", func() {
516		var (
517			status       int
518			volumeMounts []worker.VolumeMount
519			inputSources []worker.InputSource
520			taskResult   worker.TaskResult
521			err          error
522
523			fakeWorker           *workerfakes.FakeWorker
524			fakeContainerOwner   db.ContainerOwner
525			fakeWorkerSpec       worker.WorkerSpec
526			fakeContainerSpec    worker.ContainerSpec
527			fakeStrategy         *workerfakes.FakeContainerPlacementStrategy
528			fakeMetadata         db.ContainerMetadata
529			fakeDelegate         *execfakes.FakeTaskDelegate
530			fakeImageFetcherSpec worker.ImageFetcherSpec
531			fakeTaskProcessSpec  runtime.ProcessSpec
532			fakeContainer        *workerfakes.FakeContainer
533			fakeEventDelegate    *runtimefakes.FakeStartingEventDelegate
534
535			ctx    context.Context
536			cancel func()
537		)
538
539		BeforeEach(func() {
540			cpu := uint64(1024)
541			memory := uint64(1024)
542
543			buildId := 1234
544			planId := atc.PlanID("42")
545			teamId := 123
546			fakeDelegate = new(execfakes.FakeTaskDelegate)
547			fakeContainerOwner = db.NewBuildStepContainerOwner(
548				buildId,
549				planId,
550				teamId,
551			)
552			fakeWorkerSpec = worker.WorkerSpec{}
553			fakeContainerSpec = worker.ContainerSpec{
554				Platform: "some-platform",
555				Tags:     []string{"step", "tags"},
556				TeamID:   123,
557				ImageSpec: worker.ImageSpec{
558					ImageResource: &worker.ImageResource{
559						Type:    "docker",
560						Source:  atc.Source{"some": "secret-source-param"},
561						Params:  atc.Params{"some": "params"},
562						Version: atc.Version{"some": "version"},
563					},
564					Privileged: false,
565				},
566				Limits: worker.ContainerLimits{
567					CPU:    &cpu,
568					Memory: &memory,
569				},
570				Dir:            "some-artifact-root",
571				Env:            []string{"SECURE=secret-task-param"},
572				ArtifactByPath: map[string]runtime.Artifact{},
573				Inputs:         inputSources,
574				Outputs:        worker.OutputPaths{},
575			}
576			fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy)
577			fakeMetadata = db.ContainerMetadata{
578				WorkingDirectory: "some-artifact-root",
579				Type:             db.ContainerTypeTask,
580				StepName:         "some-step",
581			}
582			fakeImageFetcherSpec = worker.ImageFetcherSpec{
583				Delegate:      fakeDelegate,
584				ResourceTypes: atc.VersionedResourceTypes{},
585			}
586			fakeTaskProcessSpec = runtime.ProcessSpec{
587				Path:         "/some/path",
588				Args:         []string{"some", "args"},
589				Dir:          "/some/dir",
590				StdoutWriter: new(bytes.Buffer),
591				StderrWriter: new(bytes.Buffer),
592			}
593			fakeContainer = new(workerfakes.FakeContainer)
594			fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil)
595
596			fakeWorker = new(workerfakes.FakeWorker)
597			fakeWorker.NameReturns("some-worker")
598			fakeWorker.SatisfiesReturns(true)
599			fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil)
600
601			fakeWorker.IncreaseActiveTasksStub = func() error {
602				fakeWorker.ActiveTasksReturns(1, nil)
603				return nil
604			}
605
606			fakeWorker.DecreaseActiveTasksStub = func() error {
607				fakeWorker.ActiveTasksReturns(0, nil)
608				return nil
609			}
610
611			fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate)
612
613			fakeLockFactory = new(lockfakes.FakeLockFactory)
614			fakeLock = new(lockfakes.FakeLock)
615			fakeLockFactory.AcquireReturns(fakeLock, true, nil)
616
617			fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil)
618			ctx, cancel = context.WithCancel(context.Background())
619		})
620
621		JustBeforeEach(func() {
622			taskResult, err = client.RunTaskStep(
623				ctx,
624				logger,
625				fakeContainerOwner,
626				fakeContainerSpec,
627				fakeWorkerSpec,
628				fakeStrategy,
629				fakeMetadata,
630				fakeImageFetcherSpec,
631				fakeTaskProcessSpec,
632				fakeEventDelegate,
633				fakeLockFactory,
634			)
635			status = taskResult.ExitStatus
636			volumeMounts = taskResult.VolumeMounts
637		})
638
639		Context("choosing a worker", func() {
640			BeforeEach(func() {
641				// later fakes are uninitialized
642				fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "3"}, nil)
643			})
644
645			It("chooses a worker", func() {
646				Expect(err).ToNot(HaveOccurred())
647				Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1))
648				_, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0)
649				Expect(actualWorkerName).To(Equal(fakeWorker.Name()))
650			})
651
652			It("invokes the SelectedWorker Event on the delegate", func() {
653				Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1)))
654			})
655
656			Context("when 'limit-active-tasks' strategy is chosen", func() {
657				BeforeEach(func() {
658					fakeStrategy.ModifiesActiveTasksReturns(true)
659				})
660				Context("when a worker is found", func() {
661					BeforeEach(func() {
662						fakeWorker.NameReturns("some-worker")
663						fakePool.FindOrChooseWorkerForContainerReturns(fakeWorker, nil)
664
665						fakeContainer := new(workerfakes.FakeContainer)
666						fakeWorker.FindOrCreateContainerReturns(fakeContainer, nil)
667						fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil)
668					})
669					It("increase the active tasks on the worker", func() {
670						Expect(fakeWorker.IncreaseActiveTasksCallCount()).To(Equal(1))
671					})
672
673					Context("when the container is already present on the worker", func() {
674						BeforeEach(func() {
675							fakePool.ContainerInWorkerReturns(true, nil)
676						})
677						It("does not increase the active tasks on the worker", func() {
678							Expect(fakeWorker.IncreaseActiveTasksCallCount()).To(Equal(0))
679						})
680					})
681				})
682
683				Context("when the task is aborted waiting for an available worker", func() {
684					BeforeEach(func() {
685						cancel()
686					})
687					It("exits releasing the lock", func() {
688						Expect(err.Error()).To(ContainSubstring(context.Canceled.Error()))
689						Expect(fakeLock.ReleaseCallCount()).To(Equal(fakeLockFactory.AcquireCallCount()))
690					})
691				})
692
693				Context("when a container in worker returns an error", func() {
694					BeforeEach(func() {
695						fakePool.ContainerInWorkerReturns(false, errors.New("nope"))
696					})
697					It("release the task-step lock every time it acquires it", func() {
698						Expect(fakeLock.ReleaseCallCount()).To(Equal(fakeLockFactory.AcquireCallCount()))
699					})
700				})
701			})
702
703			Context("when finding or choosing the worker errors", func() {
704				workerDisaster := errors.New("worker selection errored")
705
706				BeforeEach(func() {
707					fakePool.FindOrChooseWorkerForContainerReturns(nil, workerDisaster)
708				})
709
710				It("returns the error", func() {
711					Expect(err).To(Equal(workerDisaster))
712				})
713
714				It("should not invokes the SelectedWorker Event on the delegate", func() {
715					Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(0)))
716				})
717			})
718
719		})
720
721		It("finds or creates a container", func() {
722			Expect(fakeWorker.FindOrCreateContainerCallCount()).To(Equal(1))
723			_, cancel, delegate, owner, createdMetadata, containerSpec, _ := fakeWorker.FindOrCreateContainerArgsForCall(0)
724			Expect(containerSpec.Inputs).To(Equal(fakeContainerSpec.Inputs))
725			Expect(containerSpec).To(Equal(fakeContainerSpec))
726			Expect(cancel).ToNot(BeNil())
727			Expect(owner).To(Equal(fakeContainerOwner))
728			Expect(delegate).To(Equal(fakeDelegate))
729			Expect(createdMetadata).To(Equal(db.ContainerMetadata{
730				WorkingDirectory: "some-artifact-root",
731				Type:             db.ContainerTypeTask,
732				StepName:         "some-step",
733			}))
734		})
735
736		Context("found a container that has already exited", func() {
737			BeforeEach(func() {
738				fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "8"}, nil)
739			})
740
741			It("does not attach to any process", func() {
742				Expect(fakeContainer.AttachCallCount()).To(BeZero())
743			})
744
745			It("returns result of container process", func() {
746				Expect(err).ToNot(HaveOccurred())
747				Expect(status).To(Equal(8))
748			})
749
750			Context("when 'limit-active-tasks' strategy is chosen", func() {
751				BeforeEach(func() {
752					fakeStrategy.ModifiesActiveTasksReturns(true)
753				})
754
755				It("decrements the active tasks counter on the worker", func() {
756					Expect(fakeWorker.ActiveTasks()).To(Equal(0))
757				})
758			})
759
760			Context("when volumes are configured and present on the container", func() {
761				var (
762					fakeMountPath1 = "some-artifact-root/some-output-configured-path/"
763					fakeMountPath2 = "some-artifact-root/some-other-output/"
764					fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/"
765
766					fakeVolume1 *workerfakes.FakeVolume
767					fakeVolume2 *workerfakes.FakeVolume
768					fakeVolume3 *workerfakes.FakeVolume
769				)
770
771				BeforeEach(func() {
772					fakeVolume1 = new(workerfakes.FakeVolume)
773					fakeVolume1.HandleReturns("some-handle-1")
774					fakeVolume2 = new(workerfakes.FakeVolume)
775					fakeVolume2.HandleReturns("some-handle-2")
776					fakeVolume3 = new(workerfakes.FakeVolume)
777					fakeVolume3.HandleReturns("some-handle-3")
778
779					fakeContainer.VolumeMountsReturns([]worker.VolumeMount{
780						{
781							Volume:    fakeVolume1,
782							MountPath: fakeMountPath1,
783						},
784						{
785							Volume:    fakeVolume2,
786							MountPath: fakeMountPath2,
787						},
788						{
789							Volume:    fakeVolume3,
790							MountPath: fakeMountPath3,
791						},
792					})
793				})
794
795				It("returns all the volume mounts", func() {
796					Expect(volumeMounts).To(ConsistOf(
797						worker.VolumeMount{
798							Volume:    fakeVolume1,
799							MountPath: fakeMountPath1,
800						},
801						worker.VolumeMount{
802							Volume:    fakeVolume2,
803							MountPath: fakeMountPath2,
804						},
805						worker.VolumeMount{
806							Volume:    fakeVolume3,
807							MountPath: fakeMountPath3,
808						},
809					))
810				})
811			})
812		})
813
814		Context("container has not already exited", func() {
815			var (
816				fakeProcess         *gardenfakes.FakeProcess
817				fakeProcessExitCode int
818
819				fakeMountPath1 = "some-artifact-root/some-output-configured-path/"
820				fakeMountPath2 = "some-artifact-root/some-other-output/"
821				fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/"
822
823				fakeVolume1 *workerfakes.FakeVolume
824				fakeVolume2 *workerfakes.FakeVolume
825				fakeVolume3 *workerfakes.FakeVolume
826
827				stdoutBuf *gbytes.Buffer
828				stderrBuf *gbytes.Buffer
829			)
830
831			BeforeEach(func() {
832				fakeProcess = new(gardenfakes.FakeProcess)
833				fakeContainer.PropertiesReturns(garden.Properties{}, nil)
834
835				// for testing volume mounts being returned
836				fakeVolume1 = new(workerfakes.FakeVolume)
837				fakeVolume1.HandleReturns("some-handle-1")
838				fakeVolume2 = new(workerfakes.FakeVolume)
839				fakeVolume2.HandleReturns("some-handle-2")
840				fakeVolume3 = new(workerfakes.FakeVolume)
841				fakeVolume3.HandleReturns("some-handle-3")
842
843				fakeContainer.VolumeMountsReturns([]worker.VolumeMount{
844					{
845						Volume:    fakeVolume1,
846						MountPath: fakeMountPath1,
847					},
848					{
849						Volume:    fakeVolume2,
850						MountPath: fakeMountPath2,
851					},
852					{
853						Volume:    fakeVolume3,
854						MountPath: fakeMountPath3,
855					},
856				})
857			})
858
859			Context("found container that is already running", func() {
860				BeforeEach(func() {
861					fakeContainer.AttachReturns(fakeProcess, nil)
862
863					stdoutBuf = new(gbytes.Buffer)
864					stderrBuf = new(gbytes.Buffer)
865					fakeTaskProcessSpec = runtime.ProcessSpec{
866						StdoutWriter: stdoutBuf,
867						StderrWriter: stderrBuf,
868					}
869				})
870
871				It("does not create a new container", func() {
872					Expect(fakeContainer.RunCallCount()).To(BeZero())
873				})
874
875				It("attaches to the running process", func() {
876					Expect(err).ToNot(HaveOccurred())
877					Expect(fakeContainer.AttachCallCount()).To(Equal(1))
878					Expect(fakeContainer.RunCallCount()).To(Equal(0))
879					_, _, actualProcessIO := fakeContainer.AttachArgsForCall(0)
880					Expect(actualProcessIO.Stdout).To(Equal(stdoutBuf))
881					Expect(actualProcessIO.Stderr).To(Equal(stderrBuf))
882				})
883
884				Context("when the process is interrupted", func() {
885					var stopped chan struct{}
886					BeforeEach(func() {
887						stopped = make(chan struct{})
888
889						fakeProcess.WaitStub = func() (int, error) {
890							defer GinkgoRecover()
891
892							<-stopped
893							return 128 + 15, nil
894						}
895
896						fakeContainer.StopStub = func(bool) error {
897							close(stopped)
898							return nil
899						}
900
901						cancel()
902					})
903
904					It("stops the container", func() {
905						Expect(fakeContainer.StopCallCount()).To(Equal(1))
906						Expect(fakeContainer.StopArgsForCall(0)).To(BeFalse())
907						Expect(err).To(Equal(context.Canceled))
908					})
909
910					Context("when container.stop returns an error", func() {
911						var disaster error
912
913						BeforeEach(func() {
914							disaster = errors.New("gotta get away")
915
916							fakeContainer.StopStub = func(bool) error {
917								close(stopped)
918								return disaster
919							}
920						})
921
922						It("doesn't return the error", func() {
923							Expect(err).To(Equal(context.Canceled))
924						})
925					})
926
927					Context("when 'limit-active-tasks' strategy is chosen", func() {
928						BeforeEach(func() {
929							fakeStrategy.ModifiesActiveTasksReturns(true)
930						})
931
932						It("decrements the active tasks counter on the worker", func() {
933							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
934						})
935					})
936				})
937
938				Context("when the process exits successfully", func() {
939					BeforeEach(func() {
940						fakeProcessExitCode = 0
941						fakeProcess.WaitReturns(fakeProcessExitCode, nil)
942					})
943					It("returns a successful result", func() {
944						Expect(status).To(BeZero())
945						Expect(err).ToNot(HaveOccurred())
946					})
947
948					It("returns all the volume mounts", func() {
949						Expect(volumeMounts).To(ConsistOf(
950							worker.VolumeMount{
951								Volume:    fakeVolume1,
952								MountPath: fakeMountPath1,
953							},
954							worker.VolumeMount{
955								Volume:    fakeVolume2,
956								MountPath: fakeMountPath2,
957							},
958							worker.VolumeMount{
959								Volume:    fakeVolume3,
960								MountPath: fakeMountPath3,
961							},
962						))
963					})
964
965					Context("when 'limit-active-tasks' strategy is chosen", func() {
966						BeforeEach(func() {
967							fakeStrategy.ModifiesActiveTasksReturns(true)
968						})
969
970						It("decrements the active tasks counter on the worker", func() {
971							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
972						})
973					})
974				})
975
976				Context("when the process exits with an error", func() {
977					disaster := errors.New("process failed")
978					BeforeEach(func() {
979						fakeProcessExitCode = 128 + 15
980						fakeProcess.WaitReturns(fakeProcessExitCode, disaster)
981					})
982					It("returns an unsuccessful result", func() {
983						Expect(status).To(Equal(fakeProcessExitCode))
984						Expect(err).To(HaveOccurred())
985						Expect(err).To(Equal(disaster))
986					})
987
988					It("returns no volume mounts", func() {
989						Expect(volumeMounts).To(BeEmpty())
990					})
991
992					Context("when 'limit-active-tasks' strategy is chosen", func() {
993						BeforeEach(func() {
994							fakeStrategy.ModifiesActiveTasksReturns(true)
995						})
996
997						It("decrements the active tasks counter on the worker", func() {
998							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
999						})
1000					})
1001				})
1002			})
1003
1004			Context("created a new container", func() {
1005				BeforeEach(func() {
1006					fakeContainer.AttachReturns(nil, errors.New("container not running"))
1007					fakeContainer.RunReturns(fakeProcess, nil)
1008
1009					stdoutBuf = new(gbytes.Buffer)
1010					stderrBuf = new(gbytes.Buffer)
1011					fakeTaskProcessSpec = runtime.ProcessSpec{
1012						StdoutWriter: stdoutBuf,
1013						StderrWriter: stderrBuf,
1014					}
1015				})
1016
1017				It("runs a new process in the container", func() {
1018					Eventually(fakeContainer.RunCallCount()).Should(Equal(1))
1019
1020					_, gardenProcessSpec, actualProcessIO := fakeContainer.RunArgsForCall(0)
1021					Expect(gardenProcessSpec.ID).To(Equal("task"))
1022					Expect(gardenProcessSpec.Path).To(Equal(fakeTaskProcessSpec.Path))
1023					Expect(gardenProcessSpec.Args).To(ConsistOf(fakeTaskProcessSpec.Args))
1024					Expect(gardenProcessSpec.Dir).To(Equal(path.Join(fakeMetadata.WorkingDirectory, fakeTaskProcessSpec.Dir)))
1025					Expect(gardenProcessSpec.TTY).To(Equal(&garden.TTYSpec{WindowSize: &garden.WindowSize{Columns: 500, Rows: 500}}))
1026					Expect(actualProcessIO.Stdout).To(Equal(stdoutBuf))
1027					Expect(actualProcessIO.Stderr).To(Equal(stderrBuf))
1028				})
1029
1030				It("invokes the Starting Event on the delegate", func() {
1031					Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1)))
1032				})
1033
1034				Context("when the process is interrupted", func() {
1035					var stopped chan struct{}
1036					BeforeEach(func() {
1037						stopped = make(chan struct{})
1038
1039						fakeProcess.WaitStub = func() (int, error) {
1040							defer GinkgoRecover()
1041
1042							<-stopped
1043							return 128 + 15, nil // wat?
1044						}
1045
1046						fakeContainer.StopStub = func(bool) error {
1047							close(stopped)
1048							return nil
1049						}
1050
1051						cancel()
1052					})
1053
1054					It("stops the container", func() {
1055						Expect(fakeContainer.StopCallCount()).To(Equal(1))
1056						Expect(fakeContainer.StopArgsForCall(0)).To(BeFalse())
1057						Expect(err).To(Equal(context.Canceled))
1058					})
1059
1060					Context("when container.stop returns an error", func() {
1061						var disaster error
1062
1063						BeforeEach(func() {
1064							disaster = errors.New("gotta get away")
1065
1066							fakeContainer.StopStub = func(bool) error {
1067								close(stopped)
1068								return disaster
1069							}
1070						})
1071
1072						It("doesn't return the error", func() {
1073							Expect(err).To(Equal(context.Canceled))
1074						})
1075					})
1076				})
1077
1078				Context("when the process exits successfully", func() {
1079					It("returns a successful result", func() {
1080						Expect(status).To(BeZero())
1081						Expect(err).ToNot(HaveOccurred())
1082					})
1083
1084					It("saves the exit status property", func() {
1085						Expect(fakeContainer.SetPropertyCallCount()).To(Equal(1))
1086
1087						name, value := fakeContainer.SetPropertyArgsForCall(0)
1088						Expect(name).To(Equal("concourse:exit-status"))
1089						Expect(value).To(Equal("0"))
1090					})
1091
1092					Context("when saving the exit status succeeds", func() {
1093						BeforeEach(func() {
1094							fakeContainer.SetPropertyReturns(nil)
1095						})
1096
1097						It("returns successfully", func() {
1098							Expect(err).ToNot(HaveOccurred())
1099						})
1100					})
1101
1102					Context("when saving the exit status fails", func() {
1103						disaster := errors.New("nope")
1104
1105						BeforeEach(func() {
1106							fakeContainer.SetPropertyStub = func(name string, value string) error {
1107								defer GinkgoRecover()
1108
1109								if name == "concourse:exit-status" {
1110									return disaster
1111								}
1112
1113								return nil
1114							}
1115						})
1116
1117						It("returns the error", func() {
1118							Expect(err).To(Equal(disaster))
1119						})
1120					})
1121
1122					Context("when volumes are configured and present on the container", func() {
1123						var (
1124							fakeMountPath1 = "some-artifact-root/some-output-configured-path/"
1125							fakeMountPath2 = "some-artifact-root/some-other-output/"
1126							fakeMountPath3 = "some-artifact-root/some-output-configured-path-with-trailing-slash/"
1127
1128							fakeVolume1 *workerfakes.FakeVolume
1129							fakeVolume2 *workerfakes.FakeVolume
1130							fakeVolume3 *workerfakes.FakeVolume
1131						)
1132
1133						BeforeEach(func() {
1134							fakeVolume1 = new(workerfakes.FakeVolume)
1135							fakeVolume1.HandleReturns("some-handle-1")
1136							fakeVolume2 = new(workerfakes.FakeVolume)
1137							fakeVolume2.HandleReturns("some-handle-2")
1138							fakeVolume3 = new(workerfakes.FakeVolume)
1139							fakeVolume3.HandleReturns("some-handle-3")
1140
1141							fakeContainer.VolumeMountsReturns([]worker.VolumeMount{
1142								{
1143									Volume:    fakeVolume1,
1144									MountPath: fakeMountPath1,
1145								},
1146								{
1147									Volume:    fakeVolume2,
1148									MountPath: fakeMountPath2,
1149								},
1150								{
1151									Volume:    fakeVolume3,
1152									MountPath: fakeMountPath3,
1153								},
1154							})
1155						})
1156
1157						It("returns all the volume mounts", func() {
1158							Expect(volumeMounts).To(ConsistOf(
1159								worker.VolumeMount{
1160									Volume:    fakeVolume1,
1161									MountPath: fakeMountPath1,
1162								},
1163								worker.VolumeMount{
1164									Volume:    fakeVolume2,
1165									MountPath: fakeMountPath2,
1166								},
1167								worker.VolumeMount{
1168									Volume:    fakeVolume3,
1169									MountPath: fakeMountPath3,
1170								},
1171							))
1172						})
1173
1174					})
1175
1176					Context("when 'limit-active-tasks' strategy is chosen", func() {
1177						BeforeEach(func() {
1178							fakeStrategy.ModifiesActiveTasksReturns(true)
1179						})
1180
1181						It("decrements the active tasks counter on the worker", func() {
1182							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
1183						})
1184					})
1185				})
1186
1187				Context("when the process exits on failure", func() {
1188					BeforeEach(func() {
1189						fakeProcessExitCode = 128 + 15
1190						fakeProcess.WaitReturns(fakeProcessExitCode, nil)
1191					})
1192					It("returns an unsuccessful result", func() {
1193						Expect(status).To(Equal(fakeProcessExitCode))
1194						Expect(err).ToNot(HaveOccurred())
1195					})
1196
1197					It("saves the exit status property", func() {
1198						Expect(fakeContainer.SetPropertyCallCount()).To(Equal(1))
1199
1200						name, value := fakeContainer.SetPropertyArgsForCall(0)
1201						Expect(name).To(Equal("concourse:exit-status"))
1202						Expect(value).To(Equal(fmt.Sprint(fakeProcessExitCode)))
1203					})
1204
1205					Context("when saving the exit status succeeds", func() {
1206						BeforeEach(func() {
1207							fakeContainer.PropertiesReturns(garden.Properties{"concourse:exit-status": "0"}, nil)
1208						})
1209
1210						It("returns successfully", func() {
1211							Expect(err).ToNot(HaveOccurred())
1212						})
1213					})
1214
1215					Context("when saving the exit status fails", func() {
1216						disaster := errors.New("nope")
1217
1218						BeforeEach(func() {
1219							fakeContainer.SetPropertyStub = func(name string, value string) error {
1220								defer GinkgoRecover()
1221
1222								if name == "concourse:exit-status" {
1223									return disaster
1224								}
1225
1226								return nil
1227							}
1228						})
1229
1230						It("returns the error", func() {
1231							Expect(err).To(Equal(disaster))
1232						})
1233					})
1234
1235					It("returns all the volume mounts", func() {
1236						Expect(volumeMounts).To(ConsistOf(
1237							worker.VolumeMount{
1238								Volume:    fakeVolume1,
1239								MountPath: fakeMountPath1,
1240							},
1241							worker.VolumeMount{
1242								Volume:    fakeVolume2,
1243								MountPath: fakeMountPath2,
1244							},
1245							worker.VolumeMount{
1246								Volume:    fakeVolume3,
1247								MountPath: fakeMountPath3,
1248							},
1249						))
1250					})
1251
1252					Context("when 'limit-active-tasks' strategy is chosen", func() {
1253						BeforeEach(func() {
1254							fakeStrategy.ModifiesActiveTasksReturns(true)
1255						})
1256
1257						It("decrements the active tasks counter on the worker", func() {
1258							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
1259						})
1260					})
1261				})
1262
1263				Context("when running the container fails with an error", func() {
1264					disaster := errors.New("nope")
1265
1266					BeforeEach(func() {
1267						fakeContainer.RunReturns(nil, disaster)
1268					})
1269
1270					It("returns the error", func() {
1271						Expect(err).To(Equal(disaster))
1272					})
1273
1274					Context("when 'limit-active-tasks' strategy is chosen", func() {
1275						BeforeEach(func() {
1276							fakeStrategy.ModifiesActiveTasksReturns(true)
1277						})
1278
1279						It("decrements the active tasks counter on the worker", func() {
1280							Expect(fakeWorker.ActiveTasks()).To(Equal(0))
1281						})
1282					})
1283				})
1284			})
1285		})
1286	})
1287
1288	Describe("RunPutStep", func() {
1289
1290		var (
1291			ctx               context.Context
1292			owner             db.ContainerOwner
1293			containerSpec     worker.ContainerSpec
1294			workerSpec        worker.WorkerSpec
1295			metadata          db.ContainerMetadata
1296			imageSpec         worker.ImageFetcherSpec
1297			fakeChosenWorker  *workerfakes.FakeWorker
1298			fakeStrategy      *workerfakes.FakeContainerPlacementStrategy
1299			fakeDelegate      *workerfakes.FakeImageFetchingDelegate
1300			fakeEventDelegate *runtimefakes.FakeStartingEventDelegate
1301			fakeResourceTypes atc.VersionedResourceTypes
1302			fakeContainer     *workerfakes.FakeContainer
1303			fakeProcessSpec   runtime.ProcessSpec
1304			fakeResource      *resourcefakes.FakeResource
1305
1306			versionResult runtime.VersionResult
1307			status        int
1308			err           error
1309			result        worker.PutResult
1310
1311			disasterErr error
1312		)
1313
1314		BeforeEach(func() {
1315			ctx = context.Background()
1316			owner = new(dbfakes.FakeContainerOwner)
1317			containerSpec = worker.ContainerSpec{}
1318			fakeStrategy = new(workerfakes.FakeContainerPlacementStrategy)
1319			workerSpec = worker.WorkerSpec{}
1320			fakeChosenWorker = new(workerfakes.FakeWorker)
1321			fakeDelegate = new(workerfakes.FakeImageFetchingDelegate)
1322			fakeEventDelegate = new(runtimefakes.FakeStartingEventDelegate)
1323			fakeResourceTypes = atc.VersionedResourceTypes{}
1324			imageSpec = worker.ImageFetcherSpec{
1325				Delegate:      fakeDelegate,
1326				ResourceTypes: fakeResourceTypes,
1327			}
1328
1329			fakeContainer = new(workerfakes.FakeContainer)
1330			disasterErr = errors.New("oh no")
1331			stdout := new(gbytes.Buffer)
1332			stderr := new(gbytes.Buffer)
1333			fakeProcessSpec = runtime.ProcessSpec{
1334				Path:         "/opt/resource/out",
1335				StdoutWriter: stdout,
1336				StderrWriter: stderr,
1337			}
1338			fakeResource = new(resourcefakes.FakeResource)
1339
1340			fakeChosenWorker = new(workerfakes.FakeWorker)
1341			fakeChosenWorker.NameReturns("some-worker")
1342			fakeChosenWorker.SatisfiesReturns(true)
1343			fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil)
1344			fakePool.FindOrChooseWorkerForContainerReturns(fakeChosenWorker, nil)
1345
1346		})
1347
1348		JustBeforeEach(func() {
1349			result, err = client.RunPutStep(
1350				ctx,
1351				logger,
1352				owner,
1353				containerSpec,
1354				workerSpec,
1355				fakeStrategy,
1356				metadata,
1357				imageSpec,
1358				fakeProcessSpec,
1359				fakeEventDelegate,
1360				fakeResource,
1361			)
1362			versionResult = result.VersionResult
1363			status = result.ExitStatus
1364		})
1365
1366		It("finds/chooses a worker", func() {
1367			Expect(fakePool.FindOrChooseWorkerForContainerCallCount()).To(Equal(1))
1368
1369			_, _, actualOwner, actualContainerSpec, actualWorkerSpec, strategy := fakePool.FindOrChooseWorkerForContainerArgsForCall(0)
1370			Expect(actualOwner).To(Equal(owner))
1371			Expect(actualContainerSpec).To(Equal(containerSpec))
1372			Expect(actualWorkerSpec).To(Equal(workerSpec))
1373			Expect(strategy).To(Equal(fakeStrategy))
1374		})
1375
1376		It("invokes the SelectedWorker Event on the delegate", func() {
1377			Expect(fakeEventDelegate.SelectedWorkerCallCount()).Should((Equal(1)))
1378			_, actualWorkerName := fakeEventDelegate.SelectedWorkerArgsForCall(0)
1379			Expect(actualWorkerName).To(Equal(fakeChosenWorker.Name()))
1380		})
1381
1382		Context("worker is chosen", func() {
1383			BeforeEach(func() {
1384				fakePool.FindOrChooseWorkerReturns(fakeChosenWorker, nil)
1385			})
1386			It("finds or creates a put container on that worker", func() {
1387				Expect(fakeChosenWorker.FindOrCreateContainerCallCount()).To(Equal(1))
1388				_, _, actualDelegate, actualOwner, actualMetadata, actualContainerSpec, actualResourceTypes := fakeChosenWorker.FindOrCreateContainerArgsForCall(0)
1389
1390				Expect(actualContainerSpec).To(Equal(containerSpec))
1391				Expect(actualDelegate).To(Equal(fakeDelegate))
1392				Expect(actualOwner).To(Equal(owner))
1393				Expect(actualMetadata).To(Equal(metadata))
1394				Expect(actualResourceTypes).To(Equal(fakeResourceTypes))
1395			})
1396		})
1397
1398		Context("worker selection returns an error", func() {
1399			BeforeEach(func() {
1400				fakePool.FindOrChooseWorkerForContainerReturns(nil, disasterErr)
1401			})
1402
1403			It("returns the error", func() {
1404				Expect(err).To(HaveOccurred())
1405				Expect(err).To(Equal(disasterErr))
1406				Expect(versionResult).To(Equal(runtime.VersionResult{}))
1407			})
1408		})
1409
1410		Context("found a container that has run resource.Put and exited", func() {
1411			BeforeEach(func() {
1412				fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil)
1413				fakeContainer.PropertyStub = func(prop string) (result string, err error) {
1414					if prop == "concourse:exit-status" {
1415						return "8", nil
1416					}
1417					return "", errors.New("unhandled property")
1418				}
1419			})
1420
1421			It("does not invoke resource.Put", func() {
1422				Expect(fakeResource.PutCallCount()).To(Equal(0))
1423			})
1424
1425			It("returns result of container process", func() {
1426				Expect(err).ToNot(HaveOccurred())
1427				Expect(status).To(Equal(8))
1428			})
1429		})
1430
1431		Context("calling resource.Put", func() {
1432			BeforeEach(func() {
1433				fakeChosenWorker.FindOrCreateContainerReturns(fakeContainer, nil)
1434				fakeContainer.PropertyReturns("0", fmt.Errorf("property not found"))
1435			})
1436
1437			It("invokes the Starting Event on the delegate", func() {
1438				Expect(fakeEventDelegate.StartingCallCount()).Should((Equal(1)))
1439			})
1440
1441			It("calls resource.Put with the correct ctx, processSpec and container", func() {
1442				actualCtx, actualProcessSpec, actualContainer := fakeResource.PutArgsForCall(0)
1443				Expect(actualCtx).To(Equal(ctx))
1444				Expect(actualProcessSpec).To(Equal(fakeProcessSpec))
1445				Expect(actualContainer).To(Equal(fakeContainer))
1446			})
1447
1448			Context("when PUT returns an error", func() {
1449
1450				Context("when the error is ErrResourceScriptFailed", func() {
1451					var (
1452						scriptFailErr runtime.ErrResourceScriptFailed
1453					)
1454					BeforeEach(func() {
1455						scriptFailErr = runtime.ErrResourceScriptFailed{
1456							ExitStatus: 10,
1457						}
1458
1459						fakeResource.PutReturns(
1460							runtime.VersionResult{},
1461							scriptFailErr,
1462						)
1463					})
1464
1465					It("returns a PutResult with the exit status from ErrResourceScriptFailed", func() {
1466						Expect(status).To(Equal(10))
1467						Expect(err).To(BeNil())
1468					})
1469				})
1470
1471				Context("when the error is NOT ErrResourceScriptFailed", func() {
1472					BeforeEach(func() {
1473						fakeResource.PutReturns(
1474							runtime.VersionResult{},
1475							disasterErr,
1476						)
1477					})
1478
1479					It("returns an error", func() {
1480						Expect(err).To(Equal(disasterErr))
1481					})
1482
1483				})
1484			})
1485
1486			Context("when PUT succeeds", func() {
1487				var expectedVersionResult runtime.VersionResult
1488				BeforeEach(func() {
1489					expectedVersionResult = runtime.VersionResult{
1490						Version:  atc.Version(map[string]string{"foo": "bar"}),
1491						Metadata: nil,
1492					}
1493
1494					fakeResource.PutReturns(expectedVersionResult, nil)
1495				})
1496				It("returns the correct VersionResult and ExitStatus", func() {
1497					Expect(err).To(BeNil())
1498					Expect(status).To(Equal(0))
1499					Expect(versionResult).To(Equal(expectedVersionResult))
1500				})
1501			})
1502		})
1503
1504		Context("worker.FindOrCreateContainer errored", func() {
1505			BeforeEach(func() {
1506				fakeChosenWorker.FindOrCreateContainerReturns(nil, disasterErr)
1507			})
1508
1509			It("returns the error immediately", func() {
1510				Expect(err).To(HaveOccurred())
1511				Expect(err).To(Equal(disasterErr))
1512				Expect(versionResult).To(Equal(runtime.VersionResult{}))
1513			})
1514		})
1515	})
1516})
1517