1package migration_test 2 3import ( 4 "database/sql" 5 "io/ioutil" 6 "math/rand" 7 "strconv" 8 "strings" 9 "sync" 10 "time" 11 12 "code.cloudfoundry.org/lager" 13 14 "github.com/concourse/concourse/atc/db/lock" 15 "github.com/concourse/concourse/atc/db/migration" 16 "github.com/concourse/concourse/atc/db/migration/migrationfakes" 17 "github.com/lib/pq" 18 19 . "github.com/onsi/ginkgo" 20 . "github.com/onsi/gomega" 21) 22 23const initialSchemaVersion = 1510262030 24const upgradedSchemaVersion = 1510670987 25 26var _ = Describe("Migration", func() { 27 var ( 28 err error 29 db *sql.DB 30 lockDB *sql.DB 31 lockFactory lock.LockFactory 32 bindata *migrationfakes.FakeBindata 33 fakeLogFunc = func(logger lager.Logger, id lock.LockID) {} 34 ) 35 36 BeforeEach(func() { 37 db, err = sql.Open("postgres", postgresRunner.DataSourceName()) 38 Expect(err).NotTo(HaveOccurred()) 39 40 lockDB, err = sql.Open("postgres", postgresRunner.DataSourceName()) 41 Expect(err).NotTo(HaveOccurred()) 42 43 lockFactory = lock.NewLockFactory(lockDB, fakeLogFunc, fakeLogFunc) 44 45 bindata = new(migrationfakes.FakeBindata) 46 bindata.AssetStub = asset 47 }) 48 49 AfterEach(func() { 50 _ = db.Close() 51 _ = lockDB.Close() 52 }) 53 54 Context("Migration test run", func() { 55 It("Runs all the migrations", func() { 56 migrator := migration.NewMigrator(db, lockFactory) 57 58 err := migrator.Up(nil, nil) 59 Expect(err).NotTo(HaveOccurred()) 60 }) 61 }) 62 63 Context("Version Check", func() { 64 It("CurrentVersion reports the current version stored in the database", func() { 65 bindata.AssetNamesReturns([]string{ 66 "1000_some_migration.up.sql", 67 "1510262030_initial_schema.up.sql", 68 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 69 "2000000000_latest_migration_does_not_matter.up.sql", 70 }) 71 bindata.AssetStub = func(name string) ([]byte, error) { 72 if name == "1000_some_migration.up.sql" { 73 return []byte{}, nil 74 } else if name == "2000000000_latest_migration_does_not_matter.up.sql" { 75 return []byte{}, nil 76 } 77 return asset(name) 78 } 79 80 myDatabaseVersion := 1234567890 81 82 SetupMigrationsHistoryTableToExistAtVersion(db, myDatabaseVersion) 83 84 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 85 86 version, err := migrator.CurrentVersion() 87 Expect(err).NotTo(HaveOccurred()) 88 Expect(version).To(Equal(myDatabaseVersion)) 89 }) 90 91 It("SupportedVersion reports the highest supported migration version", func() { 92 93 SetupMigrationsHistoryTableToExistAtVersion(db, initialSchemaVersion) 94 95 bindata.AssetNamesReturns([]string{ 96 "1000_some_migration.up.sql", 97 "1510262030_initial_schema.up.sql", 98 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 99 "300000_this_is_to_prove_we_dont_use_string_sort.up.sql", 100 "2000000000_latest_migration.up.sql", 101 }) 102 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 103 104 version, err := migrator.SupportedVersion() 105 Expect(err).NotTo(HaveOccurred()) 106 Expect(version).To(Equal(2000000000)) 107 }) 108 109 It("Ignores files it can't parse", func() { 110 111 SetupMigrationsHistoryTableToExistAtVersion(db, initialSchemaVersion) 112 113 bindata.AssetNamesReturns([]string{ 114 "1000_some_migration.up.sql", 115 "1510262030_initial_schema.up.sql", 116 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 117 "300000_this_is_to_prove_we_dont_use_string_sort.up.sql", 118 "2000000000_latest_migration.up.sql", 119 "migrations.go", 120 }) 121 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 122 123 version, err := migrator.SupportedVersion() 124 Expect(err).NotTo(HaveOccurred()) 125 Expect(version).To(Equal(2000000000)) 126 }) 127 }) 128 129 Context("Upgrade", func() { 130 Context("old schema_migrations table exist", func() { 131 var dirty bool 132 133 JustBeforeEach(func() { 134 SetupSchemaMigrationsTable(db, 8878, dirty) 135 }) 136 137 Context("dirty state is true", func() { 138 BeforeEach(func() { 139 dirty = true 140 }) 141 It("errors", func() { 142 143 Expect(err).NotTo(HaveOccurred()) 144 145 migrator := migration.NewMigrator(db, lockFactory) 146 147 err = migrator.Up(nil, nil) 148 Expect(err).To(HaveOccurred()) 149 Expect(err.Error()).To(ContainSubstring("Database is in a dirty state")) 150 151 var newTableCreated bool 152 err = db.QueryRow("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='migrations_history')").Scan(&newTableCreated) 153 Expect(newTableCreated).To(BeFalse()) 154 }) 155 }) 156 157 Context("dirty state is false", func() { 158 BeforeEach(func() { 159 dirty = false 160 }) 161 162 It("populate migrations_history table with starting version from schema_migrations table", func() { 163 startTime := time.Now() 164 migrator := migration.NewMigrator(db, lockFactory) 165 166 err = migrator.Up(nil, nil) 167 Expect(err).NotTo(HaveOccurred()) 168 169 var ( 170 version int 171 isDirty bool 172 timeStamp pq.NullTime 173 status string 174 direction string 175 ) 176 err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp ASC LIMIT 1").Scan(&version, &timeStamp, &direction, &status, &isDirty) 177 Expect(version).To(Equal(8878)) 178 Expect(isDirty).To(BeFalse()) 179 Expect(timeStamp.Time.After(startTime)).To(Equal(true)) 180 Expect(direction).To(Equal("up")) 181 Expect(status).To(Equal("passed")) 182 }) 183 184 Context("when the migrations_history table already exists", func() { 185 It("does not repopulate the migrations_history table", func() { 186 SetupMigrationsHistoryTableToExistAtVersion(db, 8878) 187 startTime := time.Now() 188 migrator := migration.NewMigrator(db, lockFactory) 189 190 err = migrator.Up(nil, nil) 191 Expect(err).NotTo(HaveOccurred()) 192 193 var timeStamp pq.NullTime 194 rows, err := db.Query("SELECT tstamp FROM migrations_history WHERE version=8878") 195 Expect(err).NotTo(HaveOccurred()) 196 var numRows = 0 197 for rows.Next() { 198 err = rows.Scan(&timeStamp) 199 numRows++ 200 } 201 Expect(numRows).To(Equal(1)) 202 Expect(timeStamp.Time.Before(startTime)).To(Equal(true)) 203 }) 204 }) 205 }) 206 }) 207 208 Context("sql migrations", func() { 209 It("runs a migration", func() { 210 simpleMigrationFilename := "1000_test_table_created.up.sql" 211 bindata.AssetReturns([]byte(` 212 BEGIN; 213 CREATE TABLE some_table (id integer); 214 COMMIT; 215 `), nil) 216 217 bindata.AssetNamesReturns([]string{ 218 simpleMigrationFilename, 219 }) 220 221 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 222 223 migrations, err := migrator.Migrations() 224 Expect(err).NotTo(HaveOccurred()) 225 Expect(len(migrations)).To(Equal(1)) 226 227 err = migrator.Up(nil, nil) 228 Expect(err).NotTo(HaveOccurred()) 229 230 By("Creating the table in the database") 231 var exists string 232 err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.tables where table_name = 'some_table')").Scan(&exists) 233 Expect(err).NotTo(HaveOccurred()) 234 Expect(exists).To(Equal("true")) 235 236 By("Updating the migrations_history table") 237 ExpectDatabaseMigrationVersionToEqual(migrator, 1000) 238 }) 239 240 It("ignores migrations before the current version", func() { 241 SetupMigrationsHistoryTableToExistAtVersion(db, 1000) 242 243 simpleMigrationFilename := "1000_test_table_created.up.sql" 244 bindata.AssetStub = func(name string) ([]byte, error) { 245 if name == simpleMigrationFilename { 246 return []byte(` 247 BEGIN; 248 CREATE TABLE some_table (id integer); 249 COMMIT; 250 `), nil 251 } 252 return asset(name) 253 } 254 bindata.AssetNamesReturns([]string{ 255 simpleMigrationFilename, 256 }) 257 258 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 259 err := migrator.Up(nil, nil) 260 Expect(err).NotTo(HaveOccurred()) 261 262 By("Not creating the database referenced in the migration") 263 var exists string 264 err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.tables where table_name = 'some_table')").Scan(&exists) 265 Expect(err).NotTo(HaveOccurred()) 266 Expect(exists).To(Equal("false")) 267 }) 268 269 It("runs the up migrations in ascending order", func() { 270 addTableMigrationFilename := "1000_test_table_created.up.sql" 271 removeTableMigrationFilename := "1001_test_table_created.up.sql" 272 273 bindata.AssetStub = func(name string) ([]byte, error) { 274 if name == addTableMigrationFilename { 275 return []byte(` 276 BEGIN; 277 CREATE TABLE some_table (id integer); 278 COMMIT; 279 `), nil 280 } else if name == removeTableMigrationFilename { 281 return []byte(` 282 BEGIN; 283 DROP TABLE some_table; 284 COMMIT; 285 `), nil 286 } 287 return asset(name) 288 } 289 290 bindata.AssetNamesReturns([]string{ 291 removeTableMigrationFilename, 292 addTableMigrationFilename, 293 }) 294 295 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 296 err := migrator.Up(nil, nil) 297 Expect(err).NotTo(HaveOccurred()) 298 299 }) 300 301 Context("With a transactional migration", func() { 302 It("leaves the database clean after a failure", func() { 303 bindata.AssetNamesReturns([]string{ 304 "1510262030_initial_schema.up.sql", 305 "1525724789_drop_reaper_addr_from_workers.up.sql", 306 }) 307 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 308 309 err := migrator.Up(nil, nil) 310 Expect(err).To(HaveOccurred()) 311 Expect(err.Error()).To(ContainSubstring("rolled back the migration")) 312 ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion) 313 ExpectMigrationToHaveFailed(db, 1525724789, false) 314 }) 315 }) 316 317 Context("With a non-transactional migration", func() { 318 It("fails if the migration version is in a dirty state", func() { 319 dirtyMigrationFilename := "1510262031_dirty_migration.up.sql" 320 bindata.AssetStub = func(name string) ([]byte, error) { 321 if name == dirtyMigrationFilename { 322 return []byte(` 323 -- NO_TRANSACTION 324 DROP TABLE nonexistent; 325 `), nil 326 } 327 return asset(name) 328 } 329 330 bindata.AssetNamesReturns([]string{ 331 dirtyMigrationFilename, 332 }) 333 334 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 335 336 err := migrator.Up(nil, nil) 337 Expect(err).To(HaveOccurred()) 338 Expect(err.Error()).To(MatchRegexp("Migration.*failed")) 339 340 ExpectMigrationToHaveFailed(db, 1510262031, true) 341 }) 342 343 It("successfully runs a non-transactional migration", func() { 344 bindata.AssetNamesReturns( 345 []string{ 346 "30000_no_transaction_migration.up.sql", 347 }, 348 ) 349 bindata.AssetReturnsOnCall(1, []byte(` 350 -- NO_TRANSACTION 351 CREATE TYPE enum_type AS ENUM ('blue_type', 'green_type'); 352 ALTER TYPE enum_type ADD VALUE 'some_type'; 353 `), nil) 354 startTime := time.Now() 355 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 356 err = migrator.Up(nil, nil) 357 Expect(err).NotTo(HaveOccurred()) 358 359 var ( 360 version int 361 isDirty bool 362 timeStamp pq.NullTime 363 status string 364 direction string 365 ) 366 err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp DESC").Scan(&version, &timeStamp, &direction, &status, &isDirty) 367 Expect(version).To(Equal(30000)) 368 Expect(isDirty).To(BeFalse()) 369 Expect(timeStamp.Time.After(startTime)).To(Equal(true)) 370 Expect(direction).To(Equal("up")) 371 Expect(status).To(Equal("passed")) 372 }) 373 374 It("gracefully fails on a failing non-transactional migration", func() { 375 bindata.AssetNamesReturns( 376 []string{ 377 "50000_failing_no_transaction_migration.up.sql", 378 }, 379 ) 380 bindata.AssetReturns([]byte(` 381 -- NO_TRANSACTION 382 CREATE TYPE enum_type AS ENUM ('blue_type', 'green_type'); 383 ALTER TYPE nonexistent_enum_type ADD VALUE 'some_type'; 384 `), nil) 385 startTime := time.Now() 386 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 387 err = migrator.Up(nil, nil) 388 Expect(err).To(HaveOccurred()) 389 390 var ( 391 version int 392 isDirty bool 393 timeStamp pq.NullTime 394 status string 395 direction string 396 ) 397 err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp DESC").Scan(&version, &timeStamp, &direction, &status, &isDirty) 398 Expect(version).To(Equal(50000)) 399 Expect(isDirty).To(BeTrue()) 400 Expect(timeStamp.Time.After(startTime)).To(Equal(true)) 401 Expect(direction).To(Equal("up")) 402 Expect(status).To(Equal("failed")) 403 }) 404 }) 405 406 It("Doesn't fail if there are no migrations to run", func() { 407 bindata.AssetNamesReturns([]string{ 408 "1510262030_initial_schema.up.sql", 409 }) 410 411 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 412 err := migrator.Up(nil, nil) 413 Expect(err).NotTo(HaveOccurred()) 414 415 err = migrator.Up(nil, nil) 416 Expect(err).NotTo(HaveOccurred()) 417 418 ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion) 419 420 ExpectMigrationVersionTableNotToExist(db) 421 422 ExpectToBeAbleToInsertData(db) 423 }) 424 425 It("Locks the database so multiple ATCs don't all run migrations at the same time", func() { 426 SetupMigrationsHistoryTableToExistAtVersion(db, 1510262030) 427 428 SetupSchemaFromFile(db, "migrations/1510262030_initial_schema.up.sql") 429 430 bindata.AssetNamesReturns([]string{ 431 "1510262030_initial_schema.up.sql", 432 }) 433 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 434 435 var wg sync.WaitGroup 436 wg.Add(3) 437 438 go TryRunUpAndVerifyResult(db, migrator, &wg) 439 go TryRunUpAndVerifyResult(db, migrator, &wg) 440 go TryRunUpAndVerifyResult(db, migrator, &wg) 441 442 wg.Wait() 443 }) 444 }) 445 446 Context("golang migrations", func() { 447 It("runs a migration with Migrate", func() { 448 449 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 450 bindata.AssetNamesReturns([]string{ 451 "1510262030_initial_schema.up.sql", 452 "1516643303_update_auth_providers.up.go", 453 }) 454 455 By("applying the initial migration") 456 err := migrator.Migrate(nil, nil, 1510262030) 457 var columnExists string 458 err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists) 459 Expect(err).NotTo(HaveOccurred()) 460 Expect(columnExists).To(Equal("true")) 461 462 err = migrator.Migrate(nil, nil, 1516643303) 463 Expect(err).NotTo(HaveOccurred()) 464 465 By("applying the go migration") 466 err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists) 467 Expect(err).NotTo(HaveOccurred()) 468 Expect(columnExists).To(Equal("false")) 469 470 By("updating the schema migrations table") 471 ExpectDatabaseMigrationVersionToEqual(migrator, 1516643303) 472 }) 473 474 It("runs a migration with Up", func() { 475 476 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 477 bindata.AssetNamesReturns([]string{ 478 "1510262030_initial_schema.up.sql", 479 "1516643303_update_auth_providers.up.go", 480 }) 481 482 err := migrator.Up(nil, nil) 483 Expect(err).NotTo(HaveOccurred()) 484 485 By("applying the migration") 486 var columnExists string 487 err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists) 488 Expect(err).NotTo(HaveOccurred()) 489 Expect(columnExists).To(Equal("false")) 490 491 By("updating the schema migrations table") 492 ExpectDatabaseMigrationVersionToEqual(migrator, 1516643303) 493 }) 494 }) 495 }) 496 497 Context("Downgrade", func() { 498 Context("Downgrades to a version that uses the old mattes/migrate schema_migrations table", func() { 499 It("Downgrades to a given version and write it to a new created schema_migrations table", func() { 500 bindata.AssetNamesReturns([]string{ 501 "1510262030_initial_schema.up.sql", 502 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 503 "1510670987_update_unique_constraint_for_resource_caches.down.sql", 504 }) 505 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 506 507 err := migrator.Up(nil, nil) 508 Expect(err).NotTo(HaveOccurred()) 509 510 currentVersion, err := migrator.CurrentVersion() 511 Expect(err).NotTo(HaveOccurred()) 512 Expect(currentVersion).To(Equal(upgradedSchemaVersion)) 513 514 err = migrator.Migrate(nil, nil, initialSchemaVersion) 515 Expect(err).NotTo(HaveOccurred()) 516 517 currentVersion, err = migrator.CurrentVersion() 518 Expect(err).NotTo(HaveOccurred()) 519 Expect(currentVersion).To(Equal(initialSchemaVersion)) 520 521 ExpectDatabaseVersionToEqual(db, initialSchemaVersion, "schema_migrations") 522 523 ExpectToBeAbleToInsertData(db) 524 }) 525 526 It("Downgrades to a given version and write it to the existing schema_migrations table with dirty true", func() { 527 528 bindata.AssetNamesReturns([]string{ 529 "1510262030_initial_schema.up.sql", 530 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 531 "1510670987_update_unique_constraint_for_resource_caches.down.sql", 532 }) 533 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 534 535 err := migrator.Up(nil, nil) 536 Expect(err).NotTo(HaveOccurred()) 537 538 currentVersion, err := migrator.CurrentVersion() 539 Expect(err).NotTo(HaveOccurred()) 540 Expect(currentVersion).To(Equal(upgradedSchemaVersion)) 541 542 SetupSchemaMigrationsTable(db, 8878, true) 543 544 err = migrator.Migrate(nil, nil, initialSchemaVersion) 545 Expect(err).NotTo(HaveOccurred()) 546 547 currentVersion, err = migrator.CurrentVersion() 548 Expect(err).NotTo(HaveOccurred()) 549 Expect(currentVersion).To(Equal(initialSchemaVersion)) 550 551 ExpectDatabaseVersionToEqual(db, initialSchemaVersion, "schema_migrations") 552 553 ExpectToBeAbleToInsertData(db) 554 }) 555 }) 556 557 Context("Downgrades to a version with new migrations_history table", func() { 558 It("Downgrades to a given version", func() { 559 bindata.AssetNamesReturns([]string{ 560 "1510262030_initial_schema.up.sql", 561 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 562 "1510670987_update_unique_constraint_for_resource_caches.down.sql", 563 }) 564 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 565 566 err := migrator.Up(nil, nil) 567 Expect(err).NotTo(HaveOccurred()) 568 569 currentVersion, err := migrator.CurrentVersion() 570 Expect(err).NotTo(HaveOccurred()) 571 Expect(currentVersion).To(Equal(upgradedSchemaVersion)) 572 573 err = migrator.Migrate(nil, nil, initialSchemaVersion) 574 Expect(err).NotTo(HaveOccurred()) 575 576 currentVersion, err = migrator.CurrentVersion() 577 Expect(err).NotTo(HaveOccurred()) 578 Expect(currentVersion).To(Equal(initialSchemaVersion)) 579 580 ExpectToBeAbleToInsertData(db) 581 }) 582 583 It("Doesn't fail if already at the requested version", func() { 584 bindata.AssetNamesReturns([]string{ 585 "1510262030_initial_schema.up.sql", 586 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 587 }) 588 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 589 590 err := migrator.Migrate(nil, nil, upgradedSchemaVersion) 591 Expect(err).NotTo(HaveOccurred()) 592 593 currentVersion, err := migrator.CurrentVersion() 594 Expect(err).NotTo(HaveOccurred()) 595 Expect(currentVersion).To(Equal(upgradedSchemaVersion)) 596 597 err = migrator.Migrate(nil, nil, upgradedSchemaVersion) 598 Expect(err).NotTo(HaveOccurred()) 599 600 currentVersion, err = migrator.CurrentVersion() 601 Expect(err).NotTo(HaveOccurred()) 602 Expect(currentVersion).To(Equal(upgradedSchemaVersion)) 603 604 ExpectToBeAbleToInsertData(db) 605 }) 606 607 It("Locks the database so multiple consumers don't run downgrade at the same time", func() { 608 migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata) 609 bindata.AssetNamesReturns([]string{ 610 "1510262030_initial_schema.up.sql", 611 "1510670987_update_unique_constraint_for_resource_caches.up.sql", 612 "1510670987_update_unique_constraint_for_resource_caches.down.sql", 613 }) 614 615 err := migrator.Up(nil, nil) 616 Expect(err).NotTo(HaveOccurred()) 617 618 var wg sync.WaitGroup 619 wg.Add(3) 620 621 go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg) 622 go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg) 623 go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg) 624 625 wg.Wait() 626 }) 627 }) 628 }) 629 630}) 631 632func TryRunUpAndVerifyResult(db *sql.DB, migrator migration.Migrator, wg *sync.WaitGroup) { 633 defer GinkgoRecover() 634 defer wg.Done() 635 636 err := migrator.Up(nil, nil) 637 Expect(err).NotTo(HaveOccurred()) 638 639 ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion) 640 641 ExpectToBeAbleToInsertData(db) 642} 643 644func TryRunMigrateAndVerifyResult(db *sql.DB, migrator migration.Migrator, version int, wg *sync.WaitGroup) { 645 defer GinkgoRecover() 646 defer wg.Done() 647 648 err := migrator.Migrate(nil, nil, version) 649 Expect(err).NotTo(HaveOccurred()) 650 651 ExpectDatabaseMigrationVersionToEqual(migrator, version) 652 653 ExpectToBeAbleToInsertData(db) 654} 655 656func SetupMigrationsHistoryTableToExistAtVersion(db *sql.DB, version int) { 657 _, err := db.Exec(`CREATE TABLE migrations_history(version bigint, tstamp timestamp with time zone, direction varchar, status varchar, dirty boolean)`) 658 Expect(err).NotTo(HaveOccurred()) 659 660 _, err = db.Exec(`INSERT INTO migrations_history(version, tstamp, direction, status, dirty) VALUES($1, current_timestamp, 'up', 'passed', false)`, version) 661 Expect(err).NotTo(HaveOccurred()) 662} 663 664func SetupSchemaMigrationsTable(db *sql.DB, version int, dirty bool) { 665 _, err := db.Exec("CREATE TABLE IF NOT EXISTS schema_migrations (version bigint, dirty boolean)") 666 Expect(err).NotTo(HaveOccurred()) 667 _, err = db.Exec("INSERT INTO schema_migrations (version, dirty) VALUES ($1, $2)", version, dirty) 668 Expect(err).NotTo(HaveOccurred()) 669} 670 671func SetupSchemaFromFile(db *sql.DB, path string) { 672 migrations, err := ioutil.ReadFile(path) 673 Expect(err).NotTo(HaveOccurred()) 674 675 for _, migration := range strings.Split(string(migrations), ";") { 676 _, err = db.Exec(migration) 677 Expect(err).NotTo(HaveOccurred()) 678 } 679} 680 681func ExpectDatabaseMigrationVersionToEqual(migrator migration.Migrator, expectedVersion int) { 682 var dbVersion int 683 dbVersion, err := migrator.CurrentVersion() 684 Expect(err).NotTo(HaveOccurred()) 685 Expect(dbVersion).To(Equal(expectedVersion)) 686} 687 688func ExpectToBeAbleToInsertData(dbConn *sql.DB) { 689 rand.Seed(time.Now().UnixNano()) 690 691 teamID := rand.Intn(10000) 692 _, err := dbConn.Exec("INSERT INTO teams(id, name) VALUES ($1, $2)", teamID, strconv.Itoa(teamID)) 693 Expect(err).NotTo(HaveOccurred()) 694 695 pipelineID := rand.Intn(10000) 696 _, err = dbConn.Exec("INSERT INTO pipelines(id, team_id, name) VALUES ($1, $2, $3)", pipelineID, teamID, strconv.Itoa(pipelineID)) 697 Expect(err).NotTo(HaveOccurred()) 698 699 jobID := rand.Intn(10000) 700 _, err = dbConn.Exec("INSERT INTO jobs(id, pipeline_id, name, config) VALUES ($1, $2, $3, '{}')", jobID, pipelineID, strconv.Itoa(jobID)) 701 Expect(err).NotTo(HaveOccurred()) 702} 703 704func ExpectMigrationToHaveFailed(dbConn *sql.DB, failedVersion int, expectDirty bool) { 705 var status string 706 var dirty bool 707 err := dbConn.QueryRow("SELECT status, dirty FROM migrations_history WHERE version=$1 ORDER BY tstamp desc LIMIT 1", failedVersion).Scan(&status, &dirty) 708 Expect(err).NotTo(HaveOccurred()) 709 Expect(status).To(Equal("failed")) 710 Expect(dirty).To(Equal(expectDirty)) 711} 712