1package db_test
2
3import (
4	"time"
5
6	sq "github.com/Masterminds/squirrel"
7	"github.com/concourse/concourse/atc"
8	"github.com/concourse/concourse/atc/db"
9	"github.com/lib/pq"
10	. "github.com/onsi/ginkgo"
11	. "github.com/onsi/gomega"
12)
13
14var _ = Describe("VolumeRepository", func() {
15	var (
16		team2             db.Team
17		usedResourceCache db.UsedResourceCache
18		build             db.Build
19	)
20
21	BeforeEach(func() {
22		var err error
23		build, err = defaultTeam.CreateOneOffBuild()
24		Expect(err).ToNot(HaveOccurred())
25
26		usedResourceCache, err = resourceCacheFactory.FindOrCreateResourceCache(
27			db.ForBuild(build.ID()),
28			"some-type",
29			atc.Version{"some": "version"},
30			atc.Source{
31				"some": "source",
32			},
33			atc.Params{"some": "params"},
34			atc.VersionedResourceTypes{
35				atc.VersionedResourceType{
36					ResourceType: atc.ResourceType{
37						Name: "some-type",
38						Type: "some-base-resource-type",
39						Source: atc.Source{
40							"some-type": "source",
41						},
42					},
43					Version: atc.Version{"some-type": "version"},
44				},
45			},
46		)
47		Expect(err).NotTo(HaveOccurred())
48	})
49
50	Describe("GetTeamVolumes", func() {
51		var (
52			team1handles []string
53			team2handles []string
54		)
55
56		It("returns task cache volumes", func() {
57			taskCache, err := taskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-path")
58			Expect(err).NotTo(HaveOccurred())
59
60			usedWorkerTaskCache, err := workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{
61				TaskCache:  taskCache,
62				WorkerName: defaultWorker.Name(),
63			})
64			Expect(err).NotTo(HaveOccurred())
65
66			creatingVolume, err := volumeRepository.CreateTaskCacheVolume(defaultTeam.ID(), usedWorkerTaskCache)
67			Expect(err).NotTo(HaveOccurred())
68
69			createdVolume, err := creatingVolume.Created()
70			Expect(err).NotTo(HaveOccurred())
71
72			volumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID())
73			Expect(err).NotTo(HaveOccurred())
74
75			Expect(volumes).To(HaveLen(1))
76			Expect(volumes[0].Handle()).To(Equal(createdVolume.Handle()))
77			Expect(volumes[0].Type()).To(Equal(db.VolumeTypeTaskCache))
78		})
79
80		Context("with container volumes", func() {
81			JustBeforeEach(func() {
82				creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{
83					Type:     "task",
84					StepName: "some-task",
85				})
86				Expect(err).ToNot(HaveOccurred())
87
88				team1handles = []string{}
89				team2handles = []string{}
90
91				team2, err = teamFactory.CreateTeam(atc.Team{Name: "some-other-defaultTeam"})
92				Expect(err).ToNot(HaveOccurred())
93
94				creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1")
95				Expect(err).NotTo(HaveOccurred())
96				createdVolume1, err := creatingVolume1.Created()
97				Expect(err).NotTo(HaveOccurred())
98				team1handles = append(team1handles, createdVolume1.Handle())
99
100				creatingVolume2, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-2")
101				Expect(err).NotTo(HaveOccurred())
102				createdVolume2, err := creatingVolume2.Created()
103				Expect(err).NotTo(HaveOccurred())
104				team1handles = append(team1handles, createdVolume2.Handle())
105
106				creatingVolume3, err := volumeRepository.CreateContainerVolume(team2.ID(), defaultWorker.Name(), creatingContainer, "some-path-3")
107				Expect(err).NotTo(HaveOccurred())
108				createdVolume3, err := creatingVolume3.Created()
109				Expect(err).NotTo(HaveOccurred())
110				team2handles = append(team2handles, createdVolume3.Handle())
111			})
112
113			It("returns only the matching defaultTeam's volumes", func() {
114				createdVolumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID())
115				Expect(err).NotTo(HaveOccurred())
116				createdHandles := []string{}
117				for _, vol := range createdVolumes {
118					createdHandles = append(createdHandles, vol.Handle())
119				}
120				Expect(createdHandles).To(Equal(team1handles))
121
122				createdVolumes2, err := volumeRepository.GetTeamVolumes(team2.ID())
123				Expect(err).NotTo(HaveOccurred())
124				createdHandles2 := []string{}
125				for _, vol := range createdVolumes2 {
126					createdHandles2 = append(createdHandles2, vol.Handle())
127				}
128				Expect(createdHandles2).To(Equal(team2handles))
129			})
130
131			Context("when worker is stalled", func() {
132				BeforeEach(func() {
133					var err error
134					defaultWorker, err = workerFactory.SaveWorker(defaultWorkerPayload, -10*time.Minute)
135					Expect(err).NotTo(HaveOccurred())
136					stalledWorkers, err := workerLifecycle.StallUnresponsiveWorkers()
137					Expect(err).NotTo(HaveOccurred())
138					Expect(stalledWorkers).To(ContainElement(defaultWorker.Name()))
139				})
140
141				It("returns volumes", func() {
142					createdVolumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID())
143					Expect(err).NotTo(HaveOccurred())
144					createdHandles := []string{}
145					for _, vol := range createdVolumes {
146						createdHandles = append(createdHandles, vol.Handle())
147					}
148					Expect(createdHandles).To(Equal(team1handles))
149
150					createdVolumes2, err := volumeRepository.GetTeamVolumes(team2.ID())
151					Expect(err).NotTo(HaveOccurred())
152					createdHandles2 := []string{}
153					for _, vol := range createdVolumes2 {
154						createdHandles2 = append(createdHandles2, vol.Handle())
155					}
156					Expect(createdHandles2).To(Equal(team2handles))
157				})
158			})
159		})
160	})
161
162	Describe("GetOrphanedVolumes", func() {
163		var (
164			expectedCreatedHandles    []string
165			expectedDestroyingHandles []string
166			certsVolumeHandle         string
167		)
168
169		BeforeEach(func() {
170			creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{
171				Type:     "task",
172				StepName: "some-task",
173			})
174			Expect(err).ToNot(HaveOccurred())
175			expectedCreatedHandles = []string{}
176			expectedDestroyingHandles = []string{}
177
178			creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1")
179			Expect(err).NotTo(HaveOccurred())
180			createdVolume1, err := creatingVolume1.Created()
181			Expect(err).NotTo(HaveOccurred())
182			expectedCreatedHandles = append(expectedCreatedHandles, createdVolume1.Handle())
183
184			creatingVolume2, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-2")
185			Expect(err).NotTo(HaveOccurred())
186			createdVolume2, err := creatingVolume2.Created()
187			Expect(err).NotTo(HaveOccurred())
188			expectedCreatedHandles = append(expectedCreatedHandles, createdVolume2.Handle())
189
190			creatingVolume3, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-3")
191			Expect(err).NotTo(HaveOccurred())
192			createdVolume3, err := creatingVolume3.Created()
193			Expect(err).NotTo(HaveOccurred())
194			destroyingVolume3, err := createdVolume3.Destroying()
195			Expect(err).NotTo(HaveOccurred())
196			expectedDestroyingHandles = append(expectedDestroyingHandles, destroyingVolume3.Handle())
197
198			creatingVolumeOtherWorker, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), otherWorker.Name(), creatingContainer, "some-path-other-1")
199			Expect(err).NotTo(HaveOccurred())
200			createdVolumeOtherWorker, err := creatingVolumeOtherWorker.Created()
201			Expect(err).NotTo(HaveOccurred())
202			expectedCreatedHandles = append(expectedCreatedHandles, createdVolumeOtherWorker.Handle())
203
204			resourceCacheVolume, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-4")
205			Expect(err).NotTo(HaveOccurred())
206			expectedCreatedHandles = append(expectedCreatedHandles, resourceCacheVolume.Handle())
207
208			resourceCacheVolumeCreated, err := resourceCacheVolume.Created()
209			Expect(err).NotTo(HaveOccurred())
210
211			err = resourceCacheVolumeCreated.InitializeResourceCache(usedResourceCache)
212			Expect(err).NotTo(HaveOccurred())
213
214			artifactVolume, err := volumeRepository.CreateVolume(defaultTeam.ID(), defaultWorker.Name(), db.VolumeTypeArtifact)
215			Expect(err).NotTo(HaveOccurred())
216			expectedCreatedHandles = append(expectedCreatedHandles, artifactVolume.Handle())
217
218			_, err = artifactVolume.Created()
219			Expect(err).NotTo(HaveOccurred())
220
221			usedWorkerBaseResourceType, found, err := workerBaseResourceTypeFactory.Find(defaultWorkerResourceType.Type, defaultWorker)
222			Expect(err).ToNot(HaveOccurred())
223			Expect(found).To(BeTrue())
224
225			baseResourceTypeVolume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType)
226			Expect(err).NotTo(HaveOccurred())
227
228			oldResourceTypeVolume, err := baseResourceTypeVolume.Created()
229			Expect(err).NotTo(HaveOccurred())
230			expectedCreatedHandles = append(expectedCreatedHandles, oldResourceTypeVolume.Handle())
231
232			newVersion := defaultWorkerResourceType
233			newVersion.Version = "some-new-brt-version"
234
235			newWorker := defaultWorkerPayload
236			newWorker.ResourceTypes = []atc.WorkerResourceType{newVersion}
237
238			defaultWorker, err = workerFactory.SaveWorker(newWorker, 0)
239			Expect(err).ToNot(HaveOccurred())
240
241			tx, err := dbConn.Begin()
242			Expect(err).NotTo(HaveOccurred())
243			workerResourceCerts, err := db.WorkerResourceCerts{
244				WorkerName: defaultWorker.Name(),
245				CertsPath:  "/etc/blah/blah/certs",
246			}.FindOrCreate(tx)
247			Expect(err).NotTo(HaveOccurred())
248			err = tx.Commit()
249			Expect(err).NotTo(HaveOccurred())
250
251			certsVolume, err := volumeRepository.CreateResourceCertsVolume(defaultWorker.Name(), workerResourceCerts)
252			Expect(err).NotTo(HaveOccurred())
253
254			certsVolumeHandle = certsVolume.Handle()
255
256			deleted, err := build.Delete()
257			Expect(err).NotTo(HaveOccurred())
258			Expect(deleted).To(BeTrue())
259
260			deleteTx, err := dbConn.Begin()
261			Expect(err).ToNot(HaveOccurred())
262			deleted, err = usedResourceCache.Destroy(deleteTx)
263			Expect(err).NotTo(HaveOccurred())
264			Expect(deleted).To(BeTrue())
265			Expect(deleteTx.Commit()).To(Succeed())
266
267			createdContainer, err := creatingContainer.Created()
268			Expect(err).NotTo(HaveOccurred())
269			destroyingContainer, err := createdContainer.Destroying()
270			Expect(err).NotTo(HaveOccurred())
271			destroyed, err := destroyingContainer.Destroy()
272			Expect(err).NotTo(HaveOccurred())
273			Expect(destroyed).To(BeTrue())
274		})
275
276		It("returns orphaned volumes", func() {
277			createdVolumes, err := volumeRepository.GetOrphanedVolumes()
278			Expect(err).NotTo(HaveOccurred())
279			createdHandles := []string{}
280
281			for _, vol := range createdVolumes {
282				createdHandles = append(createdHandles, vol.Handle())
283			}
284			Expect(createdHandles).To(ConsistOf(expectedCreatedHandles))
285			Expect(createdHandles).ToNot(ContainElement(certsVolumeHandle))
286		})
287
288		Context("when worker is stalled", func() {
289			BeforeEach(func() {
290				var err error
291				defaultWorker, err = workerFactory.SaveWorker(defaultWorkerPayload, -11*time.Minute)
292				Expect(err).NotTo(HaveOccurred())
293				stalledWorkers, err := workerLifecycle.StallUnresponsiveWorkers()
294				Expect(err).NotTo(HaveOccurred())
295				Expect(stalledWorkers).To(ContainElement(defaultWorker.Name()))
296			})
297
298			It("does not return volumes from stalled worker", func() {
299				createdVolumes, err := volumeRepository.GetOrphanedVolumes()
300				Expect(err).NotTo(HaveOccurred())
301
302				for _, v := range createdVolumes {
303					Expect(v.WorkerName()).ToNot(Equal(defaultWorker.Name()))
304				}
305			})
306		})
307
308		Context("when worker is landed", func() {
309			BeforeEach(func() {
310				err := defaultWorker.Land()
311				Expect(err).NotTo(HaveOccurred())
312				landedWorkers, err := workerLifecycle.LandFinishedLandingWorkers()
313				Expect(err).NotTo(HaveOccurred())
314				Expect(landedWorkers).To(ContainElement(defaultWorker.Name()))
315			})
316
317			It("does not return volumes for the worker", func() {
318				createdVolumes, err := volumeRepository.GetOrphanedVolumes()
319				Expect(err).NotTo(HaveOccurred())
320
321				for _, v := range createdVolumes {
322					Expect(v.WorkerName()).ToNot(Equal(defaultWorker.Name()))
323				}
324			})
325		})
326	})
327
328	Describe("DestroyFailedVolumes", func() {
329		BeforeEach(func() {
330			creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{
331				Type:     "task",
332				StepName: "some-task",
333			})
334			Expect(err).ToNot(HaveOccurred())
335
336			creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1")
337			Expect(err).NotTo(HaveOccurred())
338			_, err = creatingVolume1.Failed()
339			Expect(err).NotTo(HaveOccurred())
340		})
341
342		It("returns length of failed volumes", func() {
343			failedVolumes, err := volumeRepository.DestroyFailedVolumes()
344			Expect(err).NotTo(HaveOccurred())
345			Expect(failedVolumes).To(Equal(1))
346		})
347	})
348
349	Describe("GetDestroyingVolumes", func() {
350		var expectedDestroyingHandles []string
351		var destroyingVol db.DestroyingVolume
352
353		Context("when worker has detroying volumes", func() {
354			BeforeEach(func() {
355				creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{
356					Type:     "task",
357					StepName: "some-task",
358				})
359				Expect(err).ToNot(HaveOccurred())
360
361				expectedDestroyingHandles = []string{}
362
363				creatingVol, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1")
364				Expect(err).NotTo(HaveOccurred())
365
366				createdVol, err := creatingVol.Created()
367				Expect(err).NotTo(HaveOccurred())
368
369				destroyingVol, err = createdVol.Destroying()
370				Expect(err).NotTo(HaveOccurred())
371
372				expectedDestroyingHandles = append(expectedDestroyingHandles, destroyingVol.Handle())
373			})
374
375			It("returns destroying volumes", func() {
376				destroyingVolumes, err := volumeRepository.GetDestroyingVolumes(defaultWorker.Name())
377				Expect(err).NotTo(HaveOccurred())
378				Expect(destroyingVolumes).To(Equal(expectedDestroyingHandles))
379			})
380			Context("when worker doesn't have detroying volume", func() {
381				BeforeEach(func() {
382					deleted, err := destroyingVol.Destroy()
383					Expect(err).NotTo(HaveOccurred())
384					Expect(deleted).To(BeTrue())
385				})
386
387				It("returns empty volumes", func() {
388					destroyingVolumes, err := volumeRepository.GetDestroyingVolumes(defaultWorker.Name())
389					Expect(err).NotTo(HaveOccurred())
390					Expect(destroyingVolumes).To(BeEmpty())
391				})
392			})
393		})
394	})
395
396	Describe("CreateBaseResourceTypeVolume", func() {
397		var usedWorkerBaseResourceType *db.UsedWorkerBaseResourceType
398		BeforeEach(func() {
399			workerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn)
400			var err error
401			var found bool
402			usedWorkerBaseResourceType, found, err = workerBaseResourceTypeFactory.Find("some-base-resource-type", defaultWorker)
403			Expect(err).NotTo(HaveOccurred())
404			Expect(found).To(BeTrue())
405		})
406
407		It("creates a CreatingVolume with no team ID set", func() {
408			volume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType)
409			Expect(err).NotTo(HaveOccurred())
410			var teamID int
411			err = psql.Select("team_id").From("volumes").
412				Where(sq.Eq{"handle": volume.Handle()}).RunWith(dbConn).QueryRow().Scan(&teamID)
413			Expect(err).To(HaveOccurred())
414			Expect(err.Error()).To(ContainSubstring("Scan error"))
415		})
416	})
417
418	Describe("CreateVolume", func() {
419		It("creates a CreatingVolume of the given type with a teamID", func() {
420			volume, err := volumeRepository.CreateVolume(defaultTeam.ID(), defaultWorker.Name(), db.VolumeTypeArtifact)
421			Expect(err).NotTo(HaveOccurred())
422			var teamID int
423			var workerName string
424			err = psql.Select("team_id, worker_name").From("volumes").
425				Where(sq.Eq{"handle": volume.Handle()}).RunWith(dbConn).QueryRow().Scan(&teamID, &workerName)
426			Expect(err).NotTo(HaveOccurred())
427			Expect(teamID).To(Equal(defaultTeam.ID()))
428			Expect(workerName).To(Equal(defaultWorker.Name()))
429		})
430	})
431
432	Describe("FindBaseResourceTypeVolume", func() {
433		var usedWorkerBaseResourceType *db.UsedWorkerBaseResourceType
434		BeforeEach(func() {
435			workerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn)
436			var err error
437			var found bool
438			usedWorkerBaseResourceType, found, err = workerBaseResourceTypeFactory.Find("some-base-resource-type", defaultWorker)
439			Expect(err).NotTo(HaveOccurred())
440			Expect(found).To(BeTrue())
441		})
442
443		Context("when there is a created volume for base resource type", func() {
444			var existingVolume db.CreatedVolume
445
446			BeforeEach(func() {
447				var err error
448				volume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType)
449				Expect(err).NotTo(HaveOccurred())
450				existingVolume, err = volume.Created()
451				Expect(err).NotTo(HaveOccurred())
452			})
453
454			It("returns created volume", func() {
455				creatingVolume, createdVolume, err := volumeRepository.FindBaseResourceTypeVolume(usedWorkerBaseResourceType)
456				Expect(err).NotTo(HaveOccurred())
457				Expect(creatingVolume).To(BeNil())
458				Expect(createdVolume).ToNot(BeNil())
459				Expect(createdVolume.Handle()).To(Equal(existingVolume.Handle()))
460			})
461		})
462
463		Context("when there is a creating volume for base resource type", func() {
464			var existingVolume db.CreatingVolume
465
466			BeforeEach(func() {
467				var err error
468				existingVolume, err = volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType)
469				Expect(err).NotTo(HaveOccurred())
470			})
471
472			It("returns creating volume", func() {
473				creatingVolume, createdVolume, err := volumeRepository.FindBaseResourceTypeVolume(usedWorkerBaseResourceType)
474				Expect(err).NotTo(HaveOccurred())
475				Expect(creatingVolume).ToNot(BeNil())
476				Expect(creatingVolume.Handle()).To(Equal(existingVolume.Handle()))
477				Expect(createdVolume).To(BeNil())
478			})
479		})
480	})
481
482	Describe("FindResourceCacheVolume", func() {
483		var usedResourceCache db.UsedResourceCache
484
485		BeforeEach(func() {
486			build, err := defaultPipeline.CreateOneOffBuild()
487			Expect(err).NotTo(HaveOccurred())
488
489			usedResourceCache, err = resourceCacheFactory.FindOrCreateResourceCache(
490				db.ForBuild(build.ID()),
491				"some-type",
492				atc.Version{"some": "version"},
493				atc.Source{
494					"some": "source",
495				},
496				atc.Params{"some": "params"},
497				atc.VersionedResourceTypes{
498					atc.VersionedResourceType{
499						ResourceType: atc.ResourceType{
500							Name: "some-type",
501							Type: "some-base-resource-type",
502							Source: atc.Source{
503								"some-type": "source",
504							},
505						},
506						Version: atc.Version{"some-type": "version"},
507					},
508				},
509			)
510			Expect(err).ToNot(HaveOccurred())
511		})
512
513		Context("when there is a created volume for resource cache", func() {
514			var existingVolume db.CreatedVolume
515
516			BeforeEach(func() {
517				var err error
518				creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{
519					Type:     "get",
520					StepName: "some-resource",
521				})
522				Expect(err).ToNot(HaveOccurred())
523
524				resourceCacheVolume, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-4")
525				Expect(err).NotTo(HaveOccurred())
526
527				existingVolume, err = resourceCacheVolume.Created()
528				Expect(err).NotTo(HaveOccurred())
529
530				err = existingVolume.InitializeResourceCache(usedResourceCache)
531				Expect(err).NotTo(HaveOccurred())
532			})
533
534			It("returns created volume", func() {
535				createdVolume, found, err := volumeRepository.FindResourceCacheVolume(defaultWorker.Name(), usedResourceCache)
536				Expect(err).NotTo(HaveOccurred())
537				Expect(createdVolume.Handle()).To(Equal(existingVolume.Handle()))
538				Expect(found).To(BeTrue())
539			})
540		})
541	})
542
543	Describe("RemoveDestroyingVolumes", func() {
544		var failedErr error
545		var numDeleted int
546		var handles []string
547
548		JustBeforeEach(func() {
549			numDeleted, failedErr = volumeRepository.RemoveDestroyingVolumes(defaultWorker.Name(), handles)
550		})
551
552		Context("when there are volumes to destroy", func() {
553
554			Context("when volume is in destroying state", func() {
555				BeforeEach(func() {
556					handles = []string{"some-handle1", "some-handle2"}
557					result, err := psql.Insert("volumes").SetMap(map[string]interface{}{
558						"state":       "destroying",
559						"handle":      "123-456-abc-def",
560						"worker_name": defaultWorker.Name(),
561					}).RunWith(dbConn).Exec()
562
563					Expect(err).ToNot(HaveOccurred())
564					Expect(result.RowsAffected()).To(Equal(int64(1)))
565				})
566				It("should destroy", func() {
567					result, err := psql.Select("*").From("volumes").
568						Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec()
569
570					Expect(err).ToNot(HaveOccurred())
571					Expect(result.RowsAffected()).To(Equal(int64(0)))
572				})
573				It("returns the correct number of rows removed", func() {
574					Expect(numDeleted).To(Equal(1))
575				})
576				It("does not return an error", func() {
577					Expect(failedErr).ToNot(HaveOccurred())
578				})
579			})
580
581			Context("when handles are empty list", func() {
582				BeforeEach(func() {
583					handles = []string{}
584					result, err := psql.Insert("volumes").SetMap(map[string]interface{}{
585						"state":       "destroying",
586						"handle":      "123-456-abc-def",
587						"worker_name": defaultWorker.Name(),
588					}).RunWith(dbConn).Exec()
589
590					Expect(err).ToNot(HaveOccurred())
591					Expect(result.RowsAffected()).To(Equal(int64(1)))
592				})
593				It("should destroy", func() {
594					result, err := psql.Select("*").From("volumes").
595						Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec()
596
597					Expect(err).ToNot(HaveOccurred())
598					Expect(result.RowsAffected()).To(Equal(int64(0)))
599				})
600				It("returns the correct number of rows removed", func() {
601					Expect(numDeleted).To(Equal(1))
602				})
603				It("does not return an error", func() {
604					Expect(failedErr).ToNot(HaveOccurred())
605				})
606			})
607
608			Context("when volume is in create/creating state", func() {
609				BeforeEach(func() {
610					handles = []string{"some-handle1", "some-handle2"}
611					result, err := psql.Insert("volumes").SetMap(map[string]interface{}{
612						"state":       "creating",
613						"handle":      "123-456-abc-def",
614						"worker_name": defaultWorker.Name(),
615					}).RunWith(dbConn).Exec()
616
617					Expect(err).ToNot(HaveOccurred())
618					Expect(result.RowsAffected()).To(Equal(int64(1)))
619				})
620				It("should not destroy", func() {
621					result, err := psql.Select("*").From("volumes").
622						Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec()
623
624					Expect(err).ToNot(HaveOccurred())
625					Expect(result.RowsAffected()).To(Equal(int64(1)))
626				})
627				It("returns the correct number of rows removed", func() {
628					Expect(numDeleted).To(Equal(0))
629				})
630				It("does not return an error", func() {
631					Expect(failedErr).ToNot(HaveOccurred())
632				})
633			})
634		})
635
636		Context("when there are no volumes to destroy", func() {
637			BeforeEach(func() {
638				handles = []string{"some-handle1", "some-handle2"}
639
640				result, err := psql.Insert("volumes").SetMap(
641					map[string]interface{}{
642						"state":       "destroying",
643						"handle":      "some-handle1",
644						"worker_name": defaultWorker.Name(),
645					},
646				).RunWith(dbConn).Exec()
647				Expect(err).ToNot(HaveOccurred())
648				Expect(result.RowsAffected()).To(Equal(int64(1)))
649
650				result, err = psql.Insert("volumes").SetMap(
651					map[string]interface{}{
652						"state":       "destroying",
653						"handle":      "some-handle2",
654						"worker_name": defaultWorker.Name(),
655					},
656				).RunWith(dbConn).Exec()
657				Expect(err).ToNot(HaveOccurred())
658				Expect(result.RowsAffected()).To(Equal(int64(1)))
659			})
660
661			It("doesn't destroy volumes that are in handles", func() {
662				result, err := psql.Select("*").From("volumes").
663					Where(sq.Eq{"handle": handles}).RunWith(dbConn).Exec()
664
665				Expect(err).ToNot(HaveOccurred())
666				Expect(result.RowsAffected()).To(Equal(int64(2)))
667			})
668
669			It("does not return an error", func() {
670				Expect(failedErr).ToNot(HaveOccurred())
671			})
672			It("returns the correct number of rows removed", func() {
673				Expect(numDeleted).To(Equal(0))
674			})
675		})
676	})
677
678	Describe("RemoveMissingVolumes", func() {
679		var (
680			today        time.Time
681			gracePeriod  time.Duration
682			rowsAffected int
683			err          error
684		)
685
686		JustBeforeEach(func() {
687			rowsAffected, err = volumeRepository.RemoveMissingVolumes(gracePeriod)
688		})
689
690		Context("when there are multiple volumes with varying missing since times", func() {
691			BeforeEach(func() {
692				today = time.Now()
693
694				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
695					"handle":      "some-handle-1",
696					"state":       db.VolumeStateCreated,
697					"worker_name": defaultWorker.Name(),
698				}).RunWith(dbConn).Exec()
699				Expect(err).NotTo(HaveOccurred())
700
701				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
702					"handle":        "some-handle-2",
703					"state":         db.VolumeStateCreated,
704					"worker_name":   otherWorker.Name(),
705					"missing_since": today,
706				}).RunWith(dbConn).Exec()
707				Expect(err).NotTo(HaveOccurred())
708
709				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
710					"handle":        "some-handle-3",
711					"state":         db.VolumeStateFailed,
712					"worker_name":   otherWorker.Name(),
713					"missing_since": today.Add(-5 * time.Minute),
714				}).RunWith(dbConn).Exec()
715				Expect(err).NotTo(HaveOccurred())
716
717				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
718					"handle":        "some-handle-4",
719					"state":         db.VolumeStateDestroying,
720					"worker_name":   defaultWorker.Name(),
721					"missing_since": today.Add(-10 * time.Minute),
722				}).RunWith(dbConn).Exec()
723				Expect(err).NotTo(HaveOccurred())
724			})
725
726			Context("when no created/failed volumes have expired", func() {
727				BeforeEach(func() {
728					gracePeriod = 7 * time.Minute
729				})
730
731				It("affects no volumes", func() {
732					Expect(err).ToNot(HaveOccurred())
733					Expect(rowsAffected).To(Equal(0))
734				})
735			})
736
737			Context("when some created/failed volumes have expired", func() {
738				BeforeEach(func() {
739					gracePeriod = 3 * time.Minute
740				})
741
742				It("affects some volumes", func() {
743					Expect(err).ToNot(HaveOccurred())
744					Expect(rowsAffected).To(Equal(1))
745				})
746
747				It("affects the right volumes", func() {
748					result, err := psql.Select("*").From("volumes").
749						RunWith(dbConn).Exec()
750					Expect(err).ToNot(HaveOccurred())
751					Expect(result.RowsAffected()).To(Equal(int64(3)))
752
753					result, err = psql.Select("*").From("volumes").
754						Where(sq.Eq{"handle": "some-handle-1"}).RunWith(dbConn).Exec()
755					Expect(err).ToNot(HaveOccurred())
756					Expect(result.RowsAffected()).To(Equal(int64(1)))
757
758					result, err = psql.Select("*").From("volumes").
759						Where(sq.Eq{"handle": "some-handle-2"}).RunWith(dbConn).Exec()
760					Expect(err).ToNot(HaveOccurred())
761					Expect(result.RowsAffected()).To(Equal(int64(1)))
762
763					result, err = psql.Select("*").From("volumes").
764						Where(sq.Eq{"handle": "some-handle-4"}).RunWith(dbConn).Exec()
765					Expect(err).ToNot(HaveOccurred())
766					Expect(result.RowsAffected()).To(Equal(int64(1)))
767				})
768			})
769		})
770
771		Context("when there is a missing parent volume", func() {
772			BeforeEach(func() {
773				today = time.Now()
774
775				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
776					"handle":      "alive-handle",
777					"state":       db.VolumeStateCreated,
778					"worker_name": defaultWorker.Name(),
779				}).RunWith(dbConn).Exec()
780				Expect(err).NotTo(HaveOccurred())
781
782				var parentID int
783				err = psql.Insert("volumes").SetMap(map[string]interface{}{
784					"handle":        "parent-handle",
785					"state":         db.VolumeStateCreated,
786					"worker_name":   defaultWorker.Name(),
787					"missing_since": today.Add(-10 * time.Minute),
788				}).Suffix("RETURNING id").RunWith(dbConn).QueryRow().Scan(&parentID)
789				Expect(err).NotTo(HaveOccurred())
790
791				_, err = psql.Insert("volumes").SetMap(map[string]interface{}{
792					"handle":      "child-handle",
793					"state":       db.VolumeStateCreated,
794					"worker_name": defaultWorker.Name(),
795					"parent_id":   parentID,
796				}).RunWith(dbConn).Exec()
797				Expect(err).NotTo(HaveOccurred())
798
799				gracePeriod = 3 * time.Minute
800			})
801
802			It("affects some volumes", func() {
803				Expect(err).ToNot(HaveOccurred())
804				Expect(rowsAffected).To(Equal(2))
805			})
806
807			It("removes the child and missing parent volume", func() {
808				var volumeCount int
809				err = psql.Select("COUNT(id)").From("volumes").RunWith(dbConn).QueryRow().Scan(&volumeCount)
810				Expect(err).ToNot(HaveOccurred())
811				Expect(volumeCount).To(Equal(1))
812
813				result, err := psql.Select("*").From("volumes").
814					Where(sq.Eq{"handle": "parent-handle"}).RunWith(dbConn).Exec()
815				Expect(err).ToNot(HaveOccurred())
816				Expect(result.RowsAffected()).To(Equal(int64(0)))
817
818				result, err = psql.Select("*").From("volumes").
819					Where(sq.Eq{"handle": "child-handle"}).RunWith(dbConn).Exec()
820				Expect(err).ToNot(HaveOccurred())
821				Expect(result.RowsAffected()).To(Equal(int64(0)))
822			})
823		})
824	})
825
826	Describe("UpdateVolumesMissingSince", func() {
827		var (
828			today        time.Time
829			err          error
830			handles      []string
831			missingSince pq.NullTime
832		)
833
834		BeforeEach(func() {
835			result, err := psql.Insert("volumes").SetMap(map[string]interface{}{
836				"state":       db.VolumeStateDestroying,
837				"handle":      "some-handle1",
838				"worker_name": defaultWorker.Name(),
839			}).RunWith(dbConn).Exec()
840
841			Expect(err).ToNot(HaveOccurred())
842			Expect(result.RowsAffected()).To(Equal(int64(1)))
843
844			result, err = psql.Insert("volumes").SetMap(map[string]interface{}{
845				"state":       db.VolumeStateDestroying,
846				"handle":      "some-handle2",
847				"worker_name": defaultWorker.Name(),
848			}).RunWith(dbConn).Exec()
849
850			Expect(err).ToNot(HaveOccurred())
851			Expect(result.RowsAffected()).To(Equal(int64(1)))
852
853			today = time.Date(2018, 9, 24, 0, 0, 0, 0, time.UTC)
854
855			result, err = psql.Insert("volumes").SetMap(map[string]interface{}{
856				"state":         db.VolumeStateCreated,
857				"handle":        "some-handle3",
858				"worker_name":   defaultWorker.Name(),
859				"missing_since": today,
860			}).RunWith(dbConn).Exec()
861
862			Expect(err).ToNot(HaveOccurred())
863			Expect(result.RowsAffected()).To(Equal(int64(1)))
864		})
865
866		JustBeforeEach(func() {
867			err = volumeRepository.UpdateVolumesMissingSince(defaultWorker.Name(), handles)
868			Expect(err).ToNot(HaveOccurred())
869		})
870
871		Context("when the reported handles is a subset", func() {
872			BeforeEach(func() {
873				handles = []string{"some-handle1"}
874			})
875
876			Context("having the volumes in the creating state in the db", func() {
877				BeforeEach(func() {
878					result, err := psql.Update("volumes").
879						Where(sq.Eq{"handle": "some-handle3"}).
880						SetMap(map[string]interface{}{
881							"state":         db.VolumeStateCreating,
882							"missing_since": nil,
883						}).RunWith(dbConn).Exec()
884					Expect(err).NotTo(HaveOccurred())
885					Expect(result.RowsAffected()).To(Equal(int64(1)))
886				})
887
888				It("does not mark as missing", func() {
889					err = psql.Select("missing_since").From("volumes").
890						Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
891					Expect(err).ToNot(HaveOccurred())
892					Expect(missingSince.Valid).To(BeFalse())
893				})
894			})
895
896			It("should mark volumes not in the subset and not already marked as missing", func() {
897				err = psql.Select("missing_since").From("volumes").
898					Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
899				Expect(err).ToNot(HaveOccurred())
900				Expect(missingSince.Valid).To(BeFalse())
901
902				err = psql.Select("missing_since").From("volumes").
903					Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
904				Expect(err).ToNot(HaveOccurred())
905				Expect(missingSince.Valid).To(BeTrue())
906
907				err = psql.Select("missing_since").From("volumes").
908					Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
909				Expect(err).ToNot(HaveOccurred())
910				Expect(missingSince.Valid).To(BeTrue())
911				Expect(missingSince.Time.Unix()).To(Equal(today.Unix()))
912			})
913
914			It("does not return an error", func() {
915				Expect(err).ToNot(HaveOccurred())
916			})
917		})
918
919		Context("when the reported handles is the full set", func() {
920			BeforeEach(func() {
921				handles = []string{"some-handle1", "some-handle2"}
922			})
923
924			It("should not update", func() {
925				err = psql.Select("missing_since").From("volumes").
926					Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
927				Expect(err).ToNot(HaveOccurred())
928				Expect(missingSince.Valid).To(BeFalse())
929
930				err = psql.Select("missing_since").From("volumes").
931					Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
932				Expect(err).ToNot(HaveOccurred())
933				Expect(missingSince.Valid).To(BeFalse())
934			})
935
936			It("does not return an error", func() {
937				Expect(err).ToNot(HaveOccurred())
938			})
939		})
940
941		Context("when the reported handles includes a volume marked as missing", func() {
942			BeforeEach(func() {
943				handles = []string{"some-handle1", "some-handle2", "some-handle3"}
944			})
945
946			It("should mark the previously missing volume as not missing", func() {
947				err = psql.Select("missing_since").From("volumes").
948					Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
949				Expect(err).ToNot(HaveOccurred())
950				Expect(missingSince.Valid).To(BeFalse())
951
952				err = psql.Select("missing_since").From("volumes").
953					Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
954				Expect(err).ToNot(HaveOccurred())
955				Expect(missingSince.Valid).To(BeFalse())
956
957				err = psql.Select("missing_since").From("volumes").
958					Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince)
959				Expect(err).ToNot(HaveOccurred())
960				Expect(missingSince.Valid).To(BeFalse())
961			})
962
963			It("does not return an error", func() {
964				Expect(err).ToNot(HaveOccurred())
965			})
966		})
967	})
968
969	Describe("DestroyUnknownVolumes", func() {
970		var (
971			err                   error
972			workerReportedHandles []string
973			num                   int
974		)
975
976		BeforeEach(func() {
977			result, err := psql.Insert("volumes").SetMap(map[string]interface{}{
978				"state":       db.VolumeStateDestroying,
979				"handle":      "some-handle1",
980				"worker_name": defaultWorker.Name(),
981			}).RunWith(dbConn).Exec()
982
983			Expect(err).ToNot(HaveOccurred())
984			Expect(result.RowsAffected()).To(Equal(int64(1)))
985
986			result, err = psql.Insert("volumes").SetMap(map[string]interface{}{
987				"state":       db.VolumeStateCreated,
988				"handle":      "some-handle2",
989				"worker_name": defaultWorker.Name(),
990			}).RunWith(dbConn).Exec()
991
992			Expect(err).ToNot(HaveOccurred())
993			Expect(result.RowsAffected()).To(Equal(int64(1)))
994		})
995
996		JustBeforeEach(func() {
997			num, err = volumeRepository.DestroyUnknownVolumes(defaultWorker.Name(), workerReportedHandles)
998			Expect(err).ToNot(HaveOccurred())
999		})
1000
1001		Context("when there are volumes on the worker that are not in the db", func() {
1002			var destroyingVolumeHandles []string
1003			BeforeEach(func() {
1004				workerReportedHandles = []string{"some-handle3", "some-handle4"}
1005				destroyingVolumeHandles = append(workerReportedHandles, "some-handle1")
1006			})
1007
1008			It("adds new destroying volumes to the database", func() {
1009				result, err := psql.Select("handle").
1010					From("volumes").
1011					Where(sq.Eq{"state": db.VolumeStateDestroying}).
1012					RunWith(dbConn).Query()
1013
1014				Expect(err).ToNot(HaveOccurred())
1015
1016				var handle string
1017				for result.Next() {
1018					err = result.Scan(&handle)
1019					Expect(err).ToNot(HaveOccurred())
1020					Expect(handle).Should(BeElementOf(destroyingVolumeHandles))
1021				}
1022				Expect(num).To(Equal(2))
1023			})
1024
1025			It("does not affect volumes in any other state", func() {
1026				result, err := psql.Select("*").
1027					From("volumes").
1028					Where(sq.Eq{"state": db.VolumeStateCreated}).
1029					RunWith(dbConn).Exec()
1030
1031				Expect(err).ToNot(HaveOccurred())
1032				Expect(result.RowsAffected()).To(Equal(int64(1)))
1033				Expect(num).To(Equal(2))
1034			})
1035		})
1036
1037		Context("when there are no unknown volumes on the worker", func() {
1038			BeforeEach(func() {
1039				workerReportedHandles = []string{"some-handle1", "some-handle2"}
1040			})
1041
1042			It("should not try to destroy anything", func() {
1043				Expect(num).To(Equal(0))
1044				result, err := psql.Select("handle, state").
1045					From("volumes").
1046					Where(sq.Eq{"state": db.VolumeStateDestroying}).
1047					RunWith(dbConn).Exec()
1048
1049				Expect(err).ToNot(HaveOccurred())
1050				Expect(result.RowsAffected()).To(Equal(int64(1)))
1051			})
1052		})
1053	})
1054})
1055