1package db_test 2 3import ( 4 "time" 5 6 sq "github.com/Masterminds/squirrel" 7 "github.com/concourse/concourse/atc" 8 "github.com/concourse/concourse/atc/db" 9 "github.com/lib/pq" 10 . "github.com/onsi/ginkgo" 11 . "github.com/onsi/gomega" 12) 13 14var _ = Describe("VolumeRepository", func() { 15 var ( 16 team2 db.Team 17 usedResourceCache db.UsedResourceCache 18 build db.Build 19 ) 20 21 BeforeEach(func() { 22 var err error 23 build, err = defaultTeam.CreateOneOffBuild() 24 Expect(err).ToNot(HaveOccurred()) 25 26 usedResourceCache, err = resourceCacheFactory.FindOrCreateResourceCache( 27 db.ForBuild(build.ID()), 28 "some-type", 29 atc.Version{"some": "version"}, 30 atc.Source{ 31 "some": "source", 32 }, 33 atc.Params{"some": "params"}, 34 atc.VersionedResourceTypes{ 35 atc.VersionedResourceType{ 36 ResourceType: atc.ResourceType{ 37 Name: "some-type", 38 Type: "some-base-resource-type", 39 Source: atc.Source{ 40 "some-type": "source", 41 }, 42 }, 43 Version: atc.Version{"some-type": "version"}, 44 }, 45 }, 46 ) 47 Expect(err).NotTo(HaveOccurred()) 48 }) 49 50 Describe("GetTeamVolumes", func() { 51 var ( 52 team1handles []string 53 team2handles []string 54 ) 55 56 It("returns task cache volumes", func() { 57 taskCache, err := taskCacheFactory.FindOrCreate(defaultJob.ID(), "some-step", "some-path") 58 Expect(err).NotTo(HaveOccurred()) 59 60 usedWorkerTaskCache, err := workerTaskCacheFactory.FindOrCreate(db.WorkerTaskCache{ 61 TaskCache: taskCache, 62 WorkerName: defaultWorker.Name(), 63 }) 64 Expect(err).NotTo(HaveOccurred()) 65 66 creatingVolume, err := volumeRepository.CreateTaskCacheVolume(defaultTeam.ID(), usedWorkerTaskCache) 67 Expect(err).NotTo(HaveOccurred()) 68 69 createdVolume, err := creatingVolume.Created() 70 Expect(err).NotTo(HaveOccurred()) 71 72 volumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID()) 73 Expect(err).NotTo(HaveOccurred()) 74 75 Expect(volumes).To(HaveLen(1)) 76 Expect(volumes[0].Handle()).To(Equal(createdVolume.Handle())) 77 Expect(volumes[0].Type()).To(Equal(db.VolumeTypeTaskCache)) 78 }) 79 80 Context("with container volumes", func() { 81 JustBeforeEach(func() { 82 creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{ 83 Type: "task", 84 StepName: "some-task", 85 }) 86 Expect(err).ToNot(HaveOccurred()) 87 88 team1handles = []string{} 89 team2handles = []string{} 90 91 team2, err = teamFactory.CreateTeam(atc.Team{Name: "some-other-defaultTeam"}) 92 Expect(err).ToNot(HaveOccurred()) 93 94 creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1") 95 Expect(err).NotTo(HaveOccurred()) 96 createdVolume1, err := creatingVolume1.Created() 97 Expect(err).NotTo(HaveOccurred()) 98 team1handles = append(team1handles, createdVolume1.Handle()) 99 100 creatingVolume2, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-2") 101 Expect(err).NotTo(HaveOccurred()) 102 createdVolume2, err := creatingVolume2.Created() 103 Expect(err).NotTo(HaveOccurred()) 104 team1handles = append(team1handles, createdVolume2.Handle()) 105 106 creatingVolume3, err := volumeRepository.CreateContainerVolume(team2.ID(), defaultWorker.Name(), creatingContainer, "some-path-3") 107 Expect(err).NotTo(HaveOccurred()) 108 createdVolume3, err := creatingVolume3.Created() 109 Expect(err).NotTo(HaveOccurred()) 110 team2handles = append(team2handles, createdVolume3.Handle()) 111 }) 112 113 It("returns only the matching defaultTeam's volumes", func() { 114 createdVolumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID()) 115 Expect(err).NotTo(HaveOccurred()) 116 createdHandles := []string{} 117 for _, vol := range createdVolumes { 118 createdHandles = append(createdHandles, vol.Handle()) 119 } 120 Expect(createdHandles).To(Equal(team1handles)) 121 122 createdVolumes2, err := volumeRepository.GetTeamVolumes(team2.ID()) 123 Expect(err).NotTo(HaveOccurred()) 124 createdHandles2 := []string{} 125 for _, vol := range createdVolumes2 { 126 createdHandles2 = append(createdHandles2, vol.Handle()) 127 } 128 Expect(createdHandles2).To(Equal(team2handles)) 129 }) 130 131 Context("when worker is stalled", func() { 132 BeforeEach(func() { 133 var err error 134 defaultWorker, err = workerFactory.SaveWorker(defaultWorkerPayload, -10*time.Minute) 135 Expect(err).NotTo(HaveOccurred()) 136 stalledWorkers, err := workerLifecycle.StallUnresponsiveWorkers() 137 Expect(err).NotTo(HaveOccurred()) 138 Expect(stalledWorkers).To(ContainElement(defaultWorker.Name())) 139 }) 140 141 It("returns volumes", func() { 142 createdVolumes, err := volumeRepository.GetTeamVolumes(defaultTeam.ID()) 143 Expect(err).NotTo(HaveOccurred()) 144 createdHandles := []string{} 145 for _, vol := range createdVolumes { 146 createdHandles = append(createdHandles, vol.Handle()) 147 } 148 Expect(createdHandles).To(Equal(team1handles)) 149 150 createdVolumes2, err := volumeRepository.GetTeamVolumes(team2.ID()) 151 Expect(err).NotTo(HaveOccurred()) 152 createdHandles2 := []string{} 153 for _, vol := range createdVolumes2 { 154 createdHandles2 = append(createdHandles2, vol.Handle()) 155 } 156 Expect(createdHandles2).To(Equal(team2handles)) 157 }) 158 }) 159 }) 160 }) 161 162 Describe("GetOrphanedVolumes", func() { 163 var ( 164 expectedCreatedHandles []string 165 expectedDestroyingHandles []string 166 certsVolumeHandle string 167 ) 168 169 BeforeEach(func() { 170 creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{ 171 Type: "task", 172 StepName: "some-task", 173 }) 174 Expect(err).ToNot(HaveOccurred()) 175 expectedCreatedHandles = []string{} 176 expectedDestroyingHandles = []string{} 177 178 creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1") 179 Expect(err).NotTo(HaveOccurred()) 180 createdVolume1, err := creatingVolume1.Created() 181 Expect(err).NotTo(HaveOccurred()) 182 expectedCreatedHandles = append(expectedCreatedHandles, createdVolume1.Handle()) 183 184 creatingVolume2, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-2") 185 Expect(err).NotTo(HaveOccurred()) 186 createdVolume2, err := creatingVolume2.Created() 187 Expect(err).NotTo(HaveOccurred()) 188 expectedCreatedHandles = append(expectedCreatedHandles, createdVolume2.Handle()) 189 190 creatingVolume3, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-3") 191 Expect(err).NotTo(HaveOccurred()) 192 createdVolume3, err := creatingVolume3.Created() 193 Expect(err).NotTo(HaveOccurred()) 194 destroyingVolume3, err := createdVolume3.Destroying() 195 Expect(err).NotTo(HaveOccurred()) 196 expectedDestroyingHandles = append(expectedDestroyingHandles, destroyingVolume3.Handle()) 197 198 creatingVolumeOtherWorker, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), otherWorker.Name(), creatingContainer, "some-path-other-1") 199 Expect(err).NotTo(HaveOccurred()) 200 createdVolumeOtherWorker, err := creatingVolumeOtherWorker.Created() 201 Expect(err).NotTo(HaveOccurred()) 202 expectedCreatedHandles = append(expectedCreatedHandles, createdVolumeOtherWorker.Handle()) 203 204 resourceCacheVolume, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-4") 205 Expect(err).NotTo(HaveOccurred()) 206 expectedCreatedHandles = append(expectedCreatedHandles, resourceCacheVolume.Handle()) 207 208 resourceCacheVolumeCreated, err := resourceCacheVolume.Created() 209 Expect(err).NotTo(HaveOccurred()) 210 211 err = resourceCacheVolumeCreated.InitializeResourceCache(usedResourceCache) 212 Expect(err).NotTo(HaveOccurred()) 213 214 artifactVolume, err := volumeRepository.CreateVolume(defaultTeam.ID(), defaultWorker.Name(), db.VolumeTypeArtifact) 215 Expect(err).NotTo(HaveOccurred()) 216 expectedCreatedHandles = append(expectedCreatedHandles, artifactVolume.Handle()) 217 218 _, err = artifactVolume.Created() 219 Expect(err).NotTo(HaveOccurred()) 220 221 usedWorkerBaseResourceType, found, err := workerBaseResourceTypeFactory.Find(defaultWorkerResourceType.Type, defaultWorker) 222 Expect(err).ToNot(HaveOccurred()) 223 Expect(found).To(BeTrue()) 224 225 baseResourceTypeVolume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType) 226 Expect(err).NotTo(HaveOccurred()) 227 228 oldResourceTypeVolume, err := baseResourceTypeVolume.Created() 229 Expect(err).NotTo(HaveOccurred()) 230 expectedCreatedHandles = append(expectedCreatedHandles, oldResourceTypeVolume.Handle()) 231 232 newVersion := defaultWorkerResourceType 233 newVersion.Version = "some-new-brt-version" 234 235 newWorker := defaultWorkerPayload 236 newWorker.ResourceTypes = []atc.WorkerResourceType{newVersion} 237 238 defaultWorker, err = workerFactory.SaveWorker(newWorker, 0) 239 Expect(err).ToNot(HaveOccurred()) 240 241 tx, err := dbConn.Begin() 242 Expect(err).NotTo(HaveOccurred()) 243 workerResourceCerts, err := db.WorkerResourceCerts{ 244 WorkerName: defaultWorker.Name(), 245 CertsPath: "/etc/blah/blah/certs", 246 }.FindOrCreate(tx) 247 Expect(err).NotTo(HaveOccurred()) 248 err = tx.Commit() 249 Expect(err).NotTo(HaveOccurred()) 250 251 certsVolume, err := volumeRepository.CreateResourceCertsVolume(defaultWorker.Name(), workerResourceCerts) 252 Expect(err).NotTo(HaveOccurred()) 253 254 certsVolumeHandle = certsVolume.Handle() 255 256 deleted, err := build.Delete() 257 Expect(err).NotTo(HaveOccurred()) 258 Expect(deleted).To(BeTrue()) 259 260 deleteTx, err := dbConn.Begin() 261 Expect(err).ToNot(HaveOccurred()) 262 deleted, err = usedResourceCache.Destroy(deleteTx) 263 Expect(err).NotTo(HaveOccurred()) 264 Expect(deleted).To(BeTrue()) 265 Expect(deleteTx.Commit()).To(Succeed()) 266 267 createdContainer, err := creatingContainer.Created() 268 Expect(err).NotTo(HaveOccurred()) 269 destroyingContainer, err := createdContainer.Destroying() 270 Expect(err).NotTo(HaveOccurred()) 271 destroyed, err := destroyingContainer.Destroy() 272 Expect(err).NotTo(HaveOccurred()) 273 Expect(destroyed).To(BeTrue()) 274 }) 275 276 It("returns orphaned volumes", func() { 277 createdVolumes, err := volumeRepository.GetOrphanedVolumes() 278 Expect(err).NotTo(HaveOccurred()) 279 createdHandles := []string{} 280 281 for _, vol := range createdVolumes { 282 createdHandles = append(createdHandles, vol.Handle()) 283 } 284 Expect(createdHandles).To(ConsistOf(expectedCreatedHandles)) 285 Expect(createdHandles).ToNot(ContainElement(certsVolumeHandle)) 286 }) 287 288 Context("when worker is stalled", func() { 289 BeforeEach(func() { 290 var err error 291 defaultWorker, err = workerFactory.SaveWorker(defaultWorkerPayload, -11*time.Minute) 292 Expect(err).NotTo(HaveOccurred()) 293 stalledWorkers, err := workerLifecycle.StallUnresponsiveWorkers() 294 Expect(err).NotTo(HaveOccurred()) 295 Expect(stalledWorkers).To(ContainElement(defaultWorker.Name())) 296 }) 297 298 It("does not return volumes from stalled worker", func() { 299 createdVolumes, err := volumeRepository.GetOrphanedVolumes() 300 Expect(err).NotTo(HaveOccurred()) 301 302 for _, v := range createdVolumes { 303 Expect(v.WorkerName()).ToNot(Equal(defaultWorker.Name())) 304 } 305 }) 306 }) 307 308 Context("when worker is landed", func() { 309 BeforeEach(func() { 310 err := defaultWorker.Land() 311 Expect(err).NotTo(HaveOccurred()) 312 landedWorkers, err := workerLifecycle.LandFinishedLandingWorkers() 313 Expect(err).NotTo(HaveOccurred()) 314 Expect(landedWorkers).To(ContainElement(defaultWorker.Name())) 315 }) 316 317 It("does not return volumes for the worker", func() { 318 createdVolumes, err := volumeRepository.GetOrphanedVolumes() 319 Expect(err).NotTo(HaveOccurred()) 320 321 for _, v := range createdVolumes { 322 Expect(v.WorkerName()).ToNot(Equal(defaultWorker.Name())) 323 } 324 }) 325 }) 326 }) 327 328 Describe("DestroyFailedVolumes", func() { 329 BeforeEach(func() { 330 creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{ 331 Type: "task", 332 StepName: "some-task", 333 }) 334 Expect(err).ToNot(HaveOccurred()) 335 336 creatingVolume1, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1") 337 Expect(err).NotTo(HaveOccurred()) 338 _, err = creatingVolume1.Failed() 339 Expect(err).NotTo(HaveOccurred()) 340 }) 341 342 It("returns length of failed volumes", func() { 343 failedVolumes, err := volumeRepository.DestroyFailedVolumes() 344 Expect(err).NotTo(HaveOccurred()) 345 Expect(failedVolumes).To(Equal(1)) 346 }) 347 }) 348 349 Describe("GetDestroyingVolumes", func() { 350 var expectedDestroyingHandles []string 351 var destroyingVol db.DestroyingVolume 352 353 Context("when worker has detroying volumes", func() { 354 BeforeEach(func() { 355 creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{ 356 Type: "task", 357 StepName: "some-task", 358 }) 359 Expect(err).ToNot(HaveOccurred()) 360 361 expectedDestroyingHandles = []string{} 362 363 creatingVol, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-1") 364 Expect(err).NotTo(HaveOccurred()) 365 366 createdVol, err := creatingVol.Created() 367 Expect(err).NotTo(HaveOccurred()) 368 369 destroyingVol, err = createdVol.Destroying() 370 Expect(err).NotTo(HaveOccurred()) 371 372 expectedDestroyingHandles = append(expectedDestroyingHandles, destroyingVol.Handle()) 373 }) 374 375 It("returns destroying volumes", func() { 376 destroyingVolumes, err := volumeRepository.GetDestroyingVolumes(defaultWorker.Name()) 377 Expect(err).NotTo(HaveOccurred()) 378 Expect(destroyingVolumes).To(Equal(expectedDestroyingHandles)) 379 }) 380 Context("when worker doesn't have detroying volume", func() { 381 BeforeEach(func() { 382 deleted, err := destroyingVol.Destroy() 383 Expect(err).NotTo(HaveOccurred()) 384 Expect(deleted).To(BeTrue()) 385 }) 386 387 It("returns empty volumes", func() { 388 destroyingVolumes, err := volumeRepository.GetDestroyingVolumes(defaultWorker.Name()) 389 Expect(err).NotTo(HaveOccurred()) 390 Expect(destroyingVolumes).To(BeEmpty()) 391 }) 392 }) 393 }) 394 }) 395 396 Describe("CreateBaseResourceTypeVolume", func() { 397 var usedWorkerBaseResourceType *db.UsedWorkerBaseResourceType 398 BeforeEach(func() { 399 workerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn) 400 var err error 401 var found bool 402 usedWorkerBaseResourceType, found, err = workerBaseResourceTypeFactory.Find("some-base-resource-type", defaultWorker) 403 Expect(err).NotTo(HaveOccurred()) 404 Expect(found).To(BeTrue()) 405 }) 406 407 It("creates a CreatingVolume with no team ID set", func() { 408 volume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType) 409 Expect(err).NotTo(HaveOccurred()) 410 var teamID int 411 err = psql.Select("team_id").From("volumes"). 412 Where(sq.Eq{"handle": volume.Handle()}).RunWith(dbConn).QueryRow().Scan(&teamID) 413 Expect(err).To(HaveOccurred()) 414 Expect(err.Error()).To(ContainSubstring("Scan error")) 415 }) 416 }) 417 418 Describe("CreateVolume", func() { 419 It("creates a CreatingVolume of the given type with a teamID", func() { 420 volume, err := volumeRepository.CreateVolume(defaultTeam.ID(), defaultWorker.Name(), db.VolumeTypeArtifact) 421 Expect(err).NotTo(HaveOccurred()) 422 var teamID int 423 var workerName string 424 err = psql.Select("team_id, worker_name").From("volumes"). 425 Where(sq.Eq{"handle": volume.Handle()}).RunWith(dbConn).QueryRow().Scan(&teamID, &workerName) 426 Expect(err).NotTo(HaveOccurred()) 427 Expect(teamID).To(Equal(defaultTeam.ID())) 428 Expect(workerName).To(Equal(defaultWorker.Name())) 429 }) 430 }) 431 432 Describe("FindBaseResourceTypeVolume", func() { 433 var usedWorkerBaseResourceType *db.UsedWorkerBaseResourceType 434 BeforeEach(func() { 435 workerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn) 436 var err error 437 var found bool 438 usedWorkerBaseResourceType, found, err = workerBaseResourceTypeFactory.Find("some-base-resource-type", defaultWorker) 439 Expect(err).NotTo(HaveOccurred()) 440 Expect(found).To(BeTrue()) 441 }) 442 443 Context("when there is a created volume for base resource type", func() { 444 var existingVolume db.CreatedVolume 445 446 BeforeEach(func() { 447 var err error 448 volume, err := volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType) 449 Expect(err).NotTo(HaveOccurred()) 450 existingVolume, err = volume.Created() 451 Expect(err).NotTo(HaveOccurred()) 452 }) 453 454 It("returns created volume", func() { 455 creatingVolume, createdVolume, err := volumeRepository.FindBaseResourceTypeVolume(usedWorkerBaseResourceType) 456 Expect(err).NotTo(HaveOccurred()) 457 Expect(creatingVolume).To(BeNil()) 458 Expect(createdVolume).ToNot(BeNil()) 459 Expect(createdVolume.Handle()).To(Equal(existingVolume.Handle())) 460 }) 461 }) 462 463 Context("when there is a creating volume for base resource type", func() { 464 var existingVolume db.CreatingVolume 465 466 BeforeEach(func() { 467 var err error 468 existingVolume, err = volumeRepository.CreateBaseResourceTypeVolume(usedWorkerBaseResourceType) 469 Expect(err).NotTo(HaveOccurred()) 470 }) 471 472 It("returns creating volume", func() { 473 creatingVolume, createdVolume, err := volumeRepository.FindBaseResourceTypeVolume(usedWorkerBaseResourceType) 474 Expect(err).NotTo(HaveOccurred()) 475 Expect(creatingVolume).ToNot(BeNil()) 476 Expect(creatingVolume.Handle()).To(Equal(existingVolume.Handle())) 477 Expect(createdVolume).To(BeNil()) 478 }) 479 }) 480 }) 481 482 Describe("FindResourceCacheVolume", func() { 483 var usedResourceCache db.UsedResourceCache 484 485 BeforeEach(func() { 486 build, err := defaultPipeline.CreateOneOffBuild() 487 Expect(err).NotTo(HaveOccurred()) 488 489 usedResourceCache, err = resourceCacheFactory.FindOrCreateResourceCache( 490 db.ForBuild(build.ID()), 491 "some-type", 492 atc.Version{"some": "version"}, 493 atc.Source{ 494 "some": "source", 495 }, 496 atc.Params{"some": "params"}, 497 atc.VersionedResourceTypes{ 498 atc.VersionedResourceType{ 499 ResourceType: atc.ResourceType{ 500 Name: "some-type", 501 Type: "some-base-resource-type", 502 Source: atc.Source{ 503 "some-type": "source", 504 }, 505 }, 506 Version: atc.Version{"some-type": "version"}, 507 }, 508 }, 509 ) 510 Expect(err).ToNot(HaveOccurred()) 511 }) 512 513 Context("when there is a created volume for resource cache", func() { 514 var existingVolume db.CreatedVolume 515 516 BeforeEach(func() { 517 var err error 518 creatingContainer, err := defaultWorker.CreateContainer(db.NewBuildStepContainerOwner(build.ID(), "some-plan", defaultTeam.ID()), db.ContainerMetadata{ 519 Type: "get", 520 StepName: "some-resource", 521 }) 522 Expect(err).ToNot(HaveOccurred()) 523 524 resourceCacheVolume, err := volumeRepository.CreateContainerVolume(defaultTeam.ID(), defaultWorker.Name(), creatingContainer, "some-path-4") 525 Expect(err).NotTo(HaveOccurred()) 526 527 existingVolume, err = resourceCacheVolume.Created() 528 Expect(err).NotTo(HaveOccurred()) 529 530 err = existingVolume.InitializeResourceCache(usedResourceCache) 531 Expect(err).NotTo(HaveOccurred()) 532 }) 533 534 It("returns created volume", func() { 535 createdVolume, found, err := volumeRepository.FindResourceCacheVolume(defaultWorker.Name(), usedResourceCache) 536 Expect(err).NotTo(HaveOccurred()) 537 Expect(createdVolume.Handle()).To(Equal(existingVolume.Handle())) 538 Expect(found).To(BeTrue()) 539 }) 540 }) 541 }) 542 543 Describe("RemoveDestroyingVolumes", func() { 544 var failedErr error 545 var numDeleted int 546 var handles []string 547 548 JustBeforeEach(func() { 549 numDeleted, failedErr = volumeRepository.RemoveDestroyingVolumes(defaultWorker.Name(), handles) 550 }) 551 552 Context("when there are volumes to destroy", func() { 553 554 Context("when volume is in destroying state", func() { 555 BeforeEach(func() { 556 handles = []string{"some-handle1", "some-handle2"} 557 result, err := psql.Insert("volumes").SetMap(map[string]interface{}{ 558 "state": "destroying", 559 "handle": "123-456-abc-def", 560 "worker_name": defaultWorker.Name(), 561 }).RunWith(dbConn).Exec() 562 563 Expect(err).ToNot(HaveOccurred()) 564 Expect(result.RowsAffected()).To(Equal(int64(1))) 565 }) 566 It("should destroy", func() { 567 result, err := psql.Select("*").From("volumes"). 568 Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec() 569 570 Expect(err).ToNot(HaveOccurred()) 571 Expect(result.RowsAffected()).To(Equal(int64(0))) 572 }) 573 It("returns the correct number of rows removed", func() { 574 Expect(numDeleted).To(Equal(1)) 575 }) 576 It("does not return an error", func() { 577 Expect(failedErr).ToNot(HaveOccurred()) 578 }) 579 }) 580 581 Context("when handles are empty list", func() { 582 BeforeEach(func() { 583 handles = []string{} 584 result, err := psql.Insert("volumes").SetMap(map[string]interface{}{ 585 "state": "destroying", 586 "handle": "123-456-abc-def", 587 "worker_name": defaultWorker.Name(), 588 }).RunWith(dbConn).Exec() 589 590 Expect(err).ToNot(HaveOccurred()) 591 Expect(result.RowsAffected()).To(Equal(int64(1))) 592 }) 593 It("should destroy", func() { 594 result, err := psql.Select("*").From("volumes"). 595 Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec() 596 597 Expect(err).ToNot(HaveOccurred()) 598 Expect(result.RowsAffected()).To(Equal(int64(0))) 599 }) 600 It("returns the correct number of rows removed", func() { 601 Expect(numDeleted).To(Equal(1)) 602 }) 603 It("does not return an error", func() { 604 Expect(failedErr).ToNot(HaveOccurred()) 605 }) 606 }) 607 608 Context("when volume is in create/creating state", func() { 609 BeforeEach(func() { 610 handles = []string{"some-handle1", "some-handle2"} 611 result, err := psql.Insert("volumes").SetMap(map[string]interface{}{ 612 "state": "creating", 613 "handle": "123-456-abc-def", 614 "worker_name": defaultWorker.Name(), 615 }).RunWith(dbConn).Exec() 616 617 Expect(err).ToNot(HaveOccurred()) 618 Expect(result.RowsAffected()).To(Equal(int64(1))) 619 }) 620 It("should not destroy", func() { 621 result, err := psql.Select("*").From("volumes"). 622 Where(sq.Eq{"handle": "123-456-abc-def"}).RunWith(dbConn).Exec() 623 624 Expect(err).ToNot(HaveOccurred()) 625 Expect(result.RowsAffected()).To(Equal(int64(1))) 626 }) 627 It("returns the correct number of rows removed", func() { 628 Expect(numDeleted).To(Equal(0)) 629 }) 630 It("does not return an error", func() { 631 Expect(failedErr).ToNot(HaveOccurred()) 632 }) 633 }) 634 }) 635 636 Context("when there are no volumes to destroy", func() { 637 BeforeEach(func() { 638 handles = []string{"some-handle1", "some-handle2"} 639 640 result, err := psql.Insert("volumes").SetMap( 641 map[string]interface{}{ 642 "state": "destroying", 643 "handle": "some-handle1", 644 "worker_name": defaultWorker.Name(), 645 }, 646 ).RunWith(dbConn).Exec() 647 Expect(err).ToNot(HaveOccurred()) 648 Expect(result.RowsAffected()).To(Equal(int64(1))) 649 650 result, err = psql.Insert("volumes").SetMap( 651 map[string]interface{}{ 652 "state": "destroying", 653 "handle": "some-handle2", 654 "worker_name": defaultWorker.Name(), 655 }, 656 ).RunWith(dbConn).Exec() 657 Expect(err).ToNot(HaveOccurred()) 658 Expect(result.RowsAffected()).To(Equal(int64(1))) 659 }) 660 661 It("doesn't destroy volumes that are in handles", func() { 662 result, err := psql.Select("*").From("volumes"). 663 Where(sq.Eq{"handle": handles}).RunWith(dbConn).Exec() 664 665 Expect(err).ToNot(HaveOccurred()) 666 Expect(result.RowsAffected()).To(Equal(int64(2))) 667 }) 668 669 It("does not return an error", func() { 670 Expect(failedErr).ToNot(HaveOccurred()) 671 }) 672 It("returns the correct number of rows removed", func() { 673 Expect(numDeleted).To(Equal(0)) 674 }) 675 }) 676 }) 677 678 Describe("RemoveMissingVolumes", func() { 679 var ( 680 today time.Time 681 gracePeriod time.Duration 682 rowsAffected int 683 err error 684 ) 685 686 JustBeforeEach(func() { 687 rowsAffected, err = volumeRepository.RemoveMissingVolumes(gracePeriod) 688 }) 689 690 Context("when there are multiple volumes with varying missing since times", func() { 691 BeforeEach(func() { 692 today = time.Now() 693 694 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 695 "handle": "some-handle-1", 696 "state": db.VolumeStateCreated, 697 "worker_name": defaultWorker.Name(), 698 }).RunWith(dbConn).Exec() 699 Expect(err).NotTo(HaveOccurred()) 700 701 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 702 "handle": "some-handle-2", 703 "state": db.VolumeStateCreated, 704 "worker_name": otherWorker.Name(), 705 "missing_since": today, 706 }).RunWith(dbConn).Exec() 707 Expect(err).NotTo(HaveOccurred()) 708 709 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 710 "handle": "some-handle-3", 711 "state": db.VolumeStateFailed, 712 "worker_name": otherWorker.Name(), 713 "missing_since": today.Add(-5 * time.Minute), 714 }).RunWith(dbConn).Exec() 715 Expect(err).NotTo(HaveOccurred()) 716 717 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 718 "handle": "some-handle-4", 719 "state": db.VolumeStateDestroying, 720 "worker_name": defaultWorker.Name(), 721 "missing_since": today.Add(-10 * time.Minute), 722 }).RunWith(dbConn).Exec() 723 Expect(err).NotTo(HaveOccurred()) 724 }) 725 726 Context("when no created/failed volumes have expired", func() { 727 BeforeEach(func() { 728 gracePeriod = 7 * time.Minute 729 }) 730 731 It("affects no volumes", func() { 732 Expect(err).ToNot(HaveOccurred()) 733 Expect(rowsAffected).To(Equal(0)) 734 }) 735 }) 736 737 Context("when some created/failed volumes have expired", func() { 738 BeforeEach(func() { 739 gracePeriod = 3 * time.Minute 740 }) 741 742 It("affects some volumes", func() { 743 Expect(err).ToNot(HaveOccurred()) 744 Expect(rowsAffected).To(Equal(1)) 745 }) 746 747 It("affects the right volumes", func() { 748 result, err := psql.Select("*").From("volumes"). 749 RunWith(dbConn).Exec() 750 Expect(err).ToNot(HaveOccurred()) 751 Expect(result.RowsAffected()).To(Equal(int64(3))) 752 753 result, err = psql.Select("*").From("volumes"). 754 Where(sq.Eq{"handle": "some-handle-1"}).RunWith(dbConn).Exec() 755 Expect(err).ToNot(HaveOccurred()) 756 Expect(result.RowsAffected()).To(Equal(int64(1))) 757 758 result, err = psql.Select("*").From("volumes"). 759 Where(sq.Eq{"handle": "some-handle-2"}).RunWith(dbConn).Exec() 760 Expect(err).ToNot(HaveOccurred()) 761 Expect(result.RowsAffected()).To(Equal(int64(1))) 762 763 result, err = psql.Select("*").From("volumes"). 764 Where(sq.Eq{"handle": "some-handle-4"}).RunWith(dbConn).Exec() 765 Expect(err).ToNot(HaveOccurred()) 766 Expect(result.RowsAffected()).To(Equal(int64(1))) 767 }) 768 }) 769 }) 770 771 Context("when there is a missing parent volume", func() { 772 BeforeEach(func() { 773 today = time.Now() 774 775 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 776 "handle": "alive-handle", 777 "state": db.VolumeStateCreated, 778 "worker_name": defaultWorker.Name(), 779 }).RunWith(dbConn).Exec() 780 Expect(err).NotTo(HaveOccurred()) 781 782 var parentID int 783 err = psql.Insert("volumes").SetMap(map[string]interface{}{ 784 "handle": "parent-handle", 785 "state": db.VolumeStateCreated, 786 "worker_name": defaultWorker.Name(), 787 "missing_since": today.Add(-10 * time.Minute), 788 }).Suffix("RETURNING id").RunWith(dbConn).QueryRow().Scan(&parentID) 789 Expect(err).NotTo(HaveOccurred()) 790 791 _, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 792 "handle": "child-handle", 793 "state": db.VolumeStateCreated, 794 "worker_name": defaultWorker.Name(), 795 "parent_id": parentID, 796 }).RunWith(dbConn).Exec() 797 Expect(err).NotTo(HaveOccurred()) 798 799 gracePeriod = 3 * time.Minute 800 }) 801 802 It("affects some volumes", func() { 803 Expect(err).ToNot(HaveOccurred()) 804 Expect(rowsAffected).To(Equal(2)) 805 }) 806 807 It("removes the child and missing parent volume", func() { 808 var volumeCount int 809 err = psql.Select("COUNT(id)").From("volumes").RunWith(dbConn).QueryRow().Scan(&volumeCount) 810 Expect(err).ToNot(HaveOccurred()) 811 Expect(volumeCount).To(Equal(1)) 812 813 result, err := psql.Select("*").From("volumes"). 814 Where(sq.Eq{"handle": "parent-handle"}).RunWith(dbConn).Exec() 815 Expect(err).ToNot(HaveOccurred()) 816 Expect(result.RowsAffected()).To(Equal(int64(0))) 817 818 result, err = psql.Select("*").From("volumes"). 819 Where(sq.Eq{"handle": "child-handle"}).RunWith(dbConn).Exec() 820 Expect(err).ToNot(HaveOccurred()) 821 Expect(result.RowsAffected()).To(Equal(int64(0))) 822 }) 823 }) 824 }) 825 826 Describe("UpdateVolumesMissingSince", func() { 827 var ( 828 today time.Time 829 err error 830 handles []string 831 missingSince pq.NullTime 832 ) 833 834 BeforeEach(func() { 835 result, err := psql.Insert("volumes").SetMap(map[string]interface{}{ 836 "state": db.VolumeStateDestroying, 837 "handle": "some-handle1", 838 "worker_name": defaultWorker.Name(), 839 }).RunWith(dbConn).Exec() 840 841 Expect(err).ToNot(HaveOccurred()) 842 Expect(result.RowsAffected()).To(Equal(int64(1))) 843 844 result, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 845 "state": db.VolumeStateDestroying, 846 "handle": "some-handle2", 847 "worker_name": defaultWorker.Name(), 848 }).RunWith(dbConn).Exec() 849 850 Expect(err).ToNot(HaveOccurred()) 851 Expect(result.RowsAffected()).To(Equal(int64(1))) 852 853 today = time.Date(2018, 9, 24, 0, 0, 0, 0, time.UTC) 854 855 result, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 856 "state": db.VolumeStateCreated, 857 "handle": "some-handle3", 858 "worker_name": defaultWorker.Name(), 859 "missing_since": today, 860 }).RunWith(dbConn).Exec() 861 862 Expect(err).ToNot(HaveOccurred()) 863 Expect(result.RowsAffected()).To(Equal(int64(1))) 864 }) 865 866 JustBeforeEach(func() { 867 err = volumeRepository.UpdateVolumesMissingSince(defaultWorker.Name(), handles) 868 Expect(err).ToNot(HaveOccurred()) 869 }) 870 871 Context("when the reported handles is a subset", func() { 872 BeforeEach(func() { 873 handles = []string{"some-handle1"} 874 }) 875 876 Context("having the volumes in the creating state in the db", func() { 877 BeforeEach(func() { 878 result, err := psql.Update("volumes"). 879 Where(sq.Eq{"handle": "some-handle3"}). 880 SetMap(map[string]interface{}{ 881 "state": db.VolumeStateCreating, 882 "missing_since": nil, 883 }).RunWith(dbConn).Exec() 884 Expect(err).NotTo(HaveOccurred()) 885 Expect(result.RowsAffected()).To(Equal(int64(1))) 886 }) 887 888 It("does not mark as missing", func() { 889 err = psql.Select("missing_since").From("volumes"). 890 Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 891 Expect(err).ToNot(HaveOccurred()) 892 Expect(missingSince.Valid).To(BeFalse()) 893 }) 894 }) 895 896 It("should mark volumes not in the subset and not already marked as missing", func() { 897 err = psql.Select("missing_since").From("volumes"). 898 Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 899 Expect(err).ToNot(HaveOccurred()) 900 Expect(missingSince.Valid).To(BeFalse()) 901 902 err = psql.Select("missing_since").From("volumes"). 903 Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 904 Expect(err).ToNot(HaveOccurred()) 905 Expect(missingSince.Valid).To(BeTrue()) 906 907 err = psql.Select("missing_since").From("volumes"). 908 Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 909 Expect(err).ToNot(HaveOccurred()) 910 Expect(missingSince.Valid).To(BeTrue()) 911 Expect(missingSince.Time.Unix()).To(Equal(today.Unix())) 912 }) 913 914 It("does not return an error", func() { 915 Expect(err).ToNot(HaveOccurred()) 916 }) 917 }) 918 919 Context("when the reported handles is the full set", func() { 920 BeforeEach(func() { 921 handles = []string{"some-handle1", "some-handle2"} 922 }) 923 924 It("should not update", func() { 925 err = psql.Select("missing_since").From("volumes"). 926 Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 927 Expect(err).ToNot(HaveOccurred()) 928 Expect(missingSince.Valid).To(BeFalse()) 929 930 err = psql.Select("missing_since").From("volumes"). 931 Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 932 Expect(err).ToNot(HaveOccurred()) 933 Expect(missingSince.Valid).To(BeFalse()) 934 }) 935 936 It("does not return an error", func() { 937 Expect(err).ToNot(HaveOccurred()) 938 }) 939 }) 940 941 Context("when the reported handles includes a volume marked as missing", func() { 942 BeforeEach(func() { 943 handles = []string{"some-handle1", "some-handle2", "some-handle3"} 944 }) 945 946 It("should mark the previously missing volume as not missing", func() { 947 err = psql.Select("missing_since").From("volumes"). 948 Where(sq.Eq{"handle": "some-handle1"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 949 Expect(err).ToNot(HaveOccurred()) 950 Expect(missingSince.Valid).To(BeFalse()) 951 952 err = psql.Select("missing_since").From("volumes"). 953 Where(sq.Eq{"handle": "some-handle2"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 954 Expect(err).ToNot(HaveOccurred()) 955 Expect(missingSince.Valid).To(BeFalse()) 956 957 err = psql.Select("missing_since").From("volumes"). 958 Where(sq.Eq{"handle": "some-handle3"}).RunWith(dbConn).QueryRow().Scan(&missingSince) 959 Expect(err).ToNot(HaveOccurred()) 960 Expect(missingSince.Valid).To(BeFalse()) 961 }) 962 963 It("does not return an error", func() { 964 Expect(err).ToNot(HaveOccurred()) 965 }) 966 }) 967 }) 968 969 Describe("DestroyUnknownVolumes", func() { 970 var ( 971 err error 972 workerReportedHandles []string 973 num int 974 ) 975 976 BeforeEach(func() { 977 result, err := psql.Insert("volumes").SetMap(map[string]interface{}{ 978 "state": db.VolumeStateDestroying, 979 "handle": "some-handle1", 980 "worker_name": defaultWorker.Name(), 981 }).RunWith(dbConn).Exec() 982 983 Expect(err).ToNot(HaveOccurred()) 984 Expect(result.RowsAffected()).To(Equal(int64(1))) 985 986 result, err = psql.Insert("volumes").SetMap(map[string]interface{}{ 987 "state": db.VolumeStateCreated, 988 "handle": "some-handle2", 989 "worker_name": defaultWorker.Name(), 990 }).RunWith(dbConn).Exec() 991 992 Expect(err).ToNot(HaveOccurred()) 993 Expect(result.RowsAffected()).To(Equal(int64(1))) 994 }) 995 996 JustBeforeEach(func() { 997 num, err = volumeRepository.DestroyUnknownVolumes(defaultWorker.Name(), workerReportedHandles) 998 Expect(err).ToNot(HaveOccurred()) 999 }) 1000 1001 Context("when there are volumes on the worker that are not in the db", func() { 1002 var destroyingVolumeHandles []string 1003 BeforeEach(func() { 1004 workerReportedHandles = []string{"some-handle3", "some-handle4"} 1005 destroyingVolumeHandles = append(workerReportedHandles, "some-handle1") 1006 }) 1007 1008 It("adds new destroying volumes to the database", func() { 1009 result, err := psql.Select("handle"). 1010 From("volumes"). 1011 Where(sq.Eq{"state": db.VolumeStateDestroying}). 1012 RunWith(dbConn).Query() 1013 1014 Expect(err).ToNot(HaveOccurred()) 1015 1016 var handle string 1017 for result.Next() { 1018 err = result.Scan(&handle) 1019 Expect(err).ToNot(HaveOccurred()) 1020 Expect(handle).Should(BeElementOf(destroyingVolumeHandles)) 1021 } 1022 Expect(num).To(Equal(2)) 1023 }) 1024 1025 It("does not affect volumes in any other state", func() { 1026 result, err := psql.Select("*"). 1027 From("volumes"). 1028 Where(sq.Eq{"state": db.VolumeStateCreated}). 1029 RunWith(dbConn).Exec() 1030 1031 Expect(err).ToNot(HaveOccurred()) 1032 Expect(result.RowsAffected()).To(Equal(int64(1))) 1033 Expect(num).To(Equal(2)) 1034 }) 1035 }) 1036 1037 Context("when there are no unknown volumes on the worker", func() { 1038 BeforeEach(func() { 1039 workerReportedHandles = []string{"some-handle1", "some-handle2"} 1040 }) 1041 1042 It("should not try to destroy anything", func() { 1043 Expect(num).To(Equal(0)) 1044 result, err := psql.Select("handle, state"). 1045 From("volumes"). 1046 Where(sq.Eq{"state": db.VolumeStateDestroying}). 1047 RunWith(dbConn).Exec() 1048 1049 Expect(err).ToNot(HaveOccurred()) 1050 Expect(result.RowsAffected()).To(Equal(int64(1))) 1051 }) 1052 }) 1053 }) 1054}) 1055