1package migration_test
2
3import (
4	"database/sql"
5	"io/ioutil"
6	"math/rand"
7	"strconv"
8	"strings"
9	"sync"
10	"time"
11
12	"code.cloudfoundry.org/lager"
13
14	"github.com/concourse/concourse/atc/db/lock"
15	"github.com/concourse/concourse/atc/db/migration"
16	"github.com/concourse/concourse/atc/db/migration/migrationfakes"
17	"github.com/lib/pq"
18
19	. "github.com/onsi/ginkgo"
20	. "github.com/onsi/gomega"
21)
22
23const initialSchemaVersion = 1510262030
24const upgradedSchemaVersion = 1510670987
25
26var _ = Describe("Migration", func() {
27	var (
28		err         error
29		db          *sql.DB
30		lockDB      *sql.DB
31		lockFactory lock.LockFactory
32		bindata     *migrationfakes.FakeBindata
33		fakeLogFunc = func(logger lager.Logger, id lock.LockID) {}
34	)
35
36	BeforeEach(func() {
37		db, err = sql.Open("postgres", postgresRunner.DataSourceName())
38		Expect(err).NotTo(HaveOccurred())
39
40		lockDB, err = sql.Open("postgres", postgresRunner.DataSourceName())
41		Expect(err).NotTo(HaveOccurred())
42
43		lockFactory = lock.NewLockFactory(lockDB, fakeLogFunc, fakeLogFunc)
44
45		bindata = new(migrationfakes.FakeBindata)
46		bindata.AssetStub = asset
47	})
48
49	AfterEach(func() {
50		_ = db.Close()
51		_ = lockDB.Close()
52	})
53
54	Context("Migration test run", func() {
55		It("Runs all the migrations", func() {
56			migrator := migration.NewMigrator(db, lockFactory)
57
58			err := migrator.Up(nil, nil)
59			Expect(err).NotTo(HaveOccurred())
60		})
61	})
62
63	Context("Version Check", func() {
64		It("CurrentVersion reports the current version stored in the database", func() {
65			bindata.AssetNamesReturns([]string{
66				"1000_some_migration.up.sql",
67				"1510262030_initial_schema.up.sql",
68				"1510670987_update_unique_constraint_for_resource_caches.up.sql",
69				"2000000000_latest_migration_does_not_matter.up.sql",
70			})
71			bindata.AssetStub = func(name string) ([]byte, error) {
72				if name == "1000_some_migration.up.sql" {
73					return []byte{}, nil
74				} else if name == "2000000000_latest_migration_does_not_matter.up.sql" {
75					return []byte{}, nil
76				}
77				return asset(name)
78			}
79
80			myDatabaseVersion := 1234567890
81
82			SetupMigrationsHistoryTableToExistAtVersion(db, myDatabaseVersion)
83
84			migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
85
86			version, err := migrator.CurrentVersion()
87			Expect(err).NotTo(HaveOccurred())
88			Expect(version).To(Equal(myDatabaseVersion))
89		})
90
91		It("SupportedVersion reports the highest supported migration version", func() {
92
93			SetupMigrationsHistoryTableToExistAtVersion(db, initialSchemaVersion)
94
95			bindata.AssetNamesReturns([]string{
96				"1000_some_migration.up.sql",
97				"1510262030_initial_schema.up.sql",
98				"1510670987_update_unique_constraint_for_resource_caches.up.sql",
99				"300000_this_is_to_prove_we_dont_use_string_sort.up.sql",
100				"2000000000_latest_migration.up.sql",
101			})
102			migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
103
104			version, err := migrator.SupportedVersion()
105			Expect(err).NotTo(HaveOccurred())
106			Expect(version).To(Equal(2000000000))
107		})
108
109		It("Ignores files it can't parse", func() {
110
111			SetupMigrationsHistoryTableToExistAtVersion(db, initialSchemaVersion)
112
113			bindata.AssetNamesReturns([]string{
114				"1000_some_migration.up.sql",
115				"1510262030_initial_schema.up.sql",
116				"1510670987_update_unique_constraint_for_resource_caches.up.sql",
117				"300000_this_is_to_prove_we_dont_use_string_sort.up.sql",
118				"2000000000_latest_migration.up.sql",
119				"migrations.go",
120			})
121			migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
122
123			version, err := migrator.SupportedVersion()
124			Expect(err).NotTo(HaveOccurred())
125			Expect(version).To(Equal(2000000000))
126		})
127	})
128
129	Context("Upgrade", func() {
130		Context("old schema_migrations table exist", func() {
131			var dirty bool
132
133			JustBeforeEach(func() {
134				SetupSchemaMigrationsTable(db, 8878, dirty)
135			})
136
137			Context("dirty state is true", func() {
138				BeforeEach(func() {
139					dirty = true
140				})
141				It("errors", func() {
142
143					Expect(err).NotTo(HaveOccurred())
144
145					migrator := migration.NewMigrator(db, lockFactory)
146
147					err = migrator.Up(nil, nil)
148					Expect(err).To(HaveOccurred())
149					Expect(err.Error()).To(ContainSubstring("Database is in a dirty state"))
150
151					var newTableCreated bool
152					err = db.QueryRow("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='migrations_history')").Scan(&newTableCreated)
153					Expect(newTableCreated).To(BeFalse())
154				})
155			})
156
157			Context("dirty state is false", func() {
158				BeforeEach(func() {
159					dirty = false
160				})
161
162				It("populate migrations_history table with starting version from schema_migrations table", func() {
163					startTime := time.Now()
164					migrator := migration.NewMigrator(db, lockFactory)
165
166					err = migrator.Up(nil, nil)
167					Expect(err).NotTo(HaveOccurred())
168
169					var (
170						version   int
171						isDirty   bool
172						timeStamp pq.NullTime
173						status    string
174						direction string
175					)
176					err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp ASC LIMIT 1").Scan(&version, &timeStamp, &direction, &status, &isDirty)
177					Expect(version).To(Equal(8878))
178					Expect(isDirty).To(BeFalse())
179					Expect(timeStamp.Time.After(startTime)).To(Equal(true))
180					Expect(direction).To(Equal("up"))
181					Expect(status).To(Equal("passed"))
182				})
183
184				Context("when the migrations_history table already exists", func() {
185					It("does not repopulate the migrations_history table", func() {
186						SetupMigrationsHistoryTableToExistAtVersion(db, 8878)
187						startTime := time.Now()
188						migrator := migration.NewMigrator(db, lockFactory)
189
190						err = migrator.Up(nil, nil)
191						Expect(err).NotTo(HaveOccurred())
192
193						var timeStamp pq.NullTime
194						rows, err := db.Query("SELECT tstamp FROM migrations_history WHERE version=8878")
195						Expect(err).NotTo(HaveOccurred())
196						var numRows = 0
197						for rows.Next() {
198							err = rows.Scan(&timeStamp)
199							numRows++
200						}
201						Expect(numRows).To(Equal(1))
202						Expect(timeStamp.Time.Before(startTime)).To(Equal(true))
203					})
204				})
205			})
206		})
207
208		Context("sql migrations", func() {
209			It("runs a migration", func() {
210				simpleMigrationFilename := "1000_test_table_created.up.sql"
211				bindata.AssetReturns([]byte(`
212						BEGIN;
213						CREATE TABLE some_table (id integer);
214						COMMIT;
215						`), nil)
216
217				bindata.AssetNamesReturns([]string{
218					simpleMigrationFilename,
219				})
220
221				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
222
223				migrations, err := migrator.Migrations()
224				Expect(err).NotTo(HaveOccurred())
225				Expect(len(migrations)).To(Equal(1))
226
227				err = migrator.Up(nil, nil)
228				Expect(err).NotTo(HaveOccurred())
229
230				By("Creating the table in the database")
231				var exists string
232				err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.tables where table_name = 'some_table')").Scan(&exists)
233				Expect(err).NotTo(HaveOccurred())
234				Expect(exists).To(Equal("true"))
235
236				By("Updating the migrations_history table")
237				ExpectDatabaseMigrationVersionToEqual(migrator, 1000)
238			})
239
240			It("ignores migrations before the current version", func() {
241				SetupMigrationsHistoryTableToExistAtVersion(db, 1000)
242
243				simpleMigrationFilename := "1000_test_table_created.up.sql"
244				bindata.AssetStub = func(name string) ([]byte, error) {
245					if name == simpleMigrationFilename {
246						return []byte(`
247						BEGIN;
248						CREATE TABLE some_table (id integer);
249						COMMIT;
250						`), nil
251					}
252					return asset(name)
253				}
254				bindata.AssetNamesReturns([]string{
255					simpleMigrationFilename,
256				})
257
258				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
259				err := migrator.Up(nil, nil)
260				Expect(err).NotTo(HaveOccurred())
261
262				By("Not creating the database referenced in the migration")
263				var exists string
264				err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.tables where table_name = 'some_table')").Scan(&exists)
265				Expect(err).NotTo(HaveOccurred())
266				Expect(exists).To(Equal("false"))
267			})
268
269			It("runs the up migrations in ascending order", func() {
270				addTableMigrationFilename := "1000_test_table_created.up.sql"
271				removeTableMigrationFilename := "1001_test_table_created.up.sql"
272
273				bindata.AssetStub = func(name string) ([]byte, error) {
274					if name == addTableMigrationFilename {
275						return []byte(`
276						BEGIN;
277						CREATE TABLE some_table (id integer);
278						COMMIT;
279						`), nil
280					} else if name == removeTableMigrationFilename {
281						return []byte(`
282						BEGIN;
283						DROP TABLE some_table;
284						COMMIT;
285						`), nil
286					}
287					return asset(name)
288				}
289
290				bindata.AssetNamesReturns([]string{
291					removeTableMigrationFilename,
292					addTableMigrationFilename,
293				})
294
295				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
296				err := migrator.Up(nil, nil)
297				Expect(err).NotTo(HaveOccurred())
298
299			})
300
301			Context("With a transactional migration", func() {
302				It("leaves the database clean after a failure", func() {
303					bindata.AssetNamesReturns([]string{
304						"1510262030_initial_schema.up.sql",
305						"1525724789_drop_reaper_addr_from_workers.up.sql",
306					})
307					migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
308
309					err := migrator.Up(nil, nil)
310					Expect(err).To(HaveOccurred())
311					Expect(err.Error()).To(ContainSubstring("rolled back the migration"))
312					ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion)
313					ExpectMigrationToHaveFailed(db, 1525724789, false)
314				})
315			})
316
317			Context("With a non-transactional migration", func() {
318				It("fails if the migration version is in a dirty state", func() {
319					dirtyMigrationFilename := "1510262031_dirty_migration.up.sql"
320					bindata.AssetStub = func(name string) ([]byte, error) {
321						if name == dirtyMigrationFilename {
322							return []byte(`
323							-- NO_TRANSACTION
324							DROP TABLE nonexistent;
325						`), nil
326						}
327						return asset(name)
328					}
329
330					bindata.AssetNamesReturns([]string{
331						dirtyMigrationFilename,
332					})
333
334					migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
335
336					err := migrator.Up(nil, nil)
337					Expect(err).To(HaveOccurred())
338					Expect(err.Error()).To(MatchRegexp("Migration.*failed"))
339
340					ExpectMigrationToHaveFailed(db, 1510262031, true)
341				})
342
343				It("successfully runs a non-transactional migration", func() {
344					bindata.AssetNamesReturns(
345						[]string{
346							"30000_no_transaction_migration.up.sql",
347						},
348					)
349					bindata.AssetReturnsOnCall(1, []byte(`
350							-- NO_TRANSACTION
351							CREATE TYPE enum_type AS ENUM ('blue_type', 'green_type');
352							ALTER TYPE enum_type ADD VALUE 'some_type';
353					`), nil)
354					startTime := time.Now()
355					migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
356					err = migrator.Up(nil, nil)
357					Expect(err).NotTo(HaveOccurred())
358
359					var (
360						version   int
361						isDirty   bool
362						timeStamp pq.NullTime
363						status    string
364						direction string
365					)
366					err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp DESC").Scan(&version, &timeStamp, &direction, &status, &isDirty)
367					Expect(version).To(Equal(30000))
368					Expect(isDirty).To(BeFalse())
369					Expect(timeStamp.Time.After(startTime)).To(Equal(true))
370					Expect(direction).To(Equal("up"))
371					Expect(status).To(Equal("passed"))
372				})
373
374				It("gracefully fails on a failing non-transactional migration", func() {
375					bindata.AssetNamesReturns(
376						[]string{
377							"50000_failing_no_transaction_migration.up.sql",
378						},
379					)
380					bindata.AssetReturns([]byte(`
381							-- NO_TRANSACTION
382							CREATE TYPE enum_type AS ENUM ('blue_type', 'green_type');
383							ALTER TYPE nonexistent_enum_type ADD VALUE 'some_type';
384					`), nil)
385					startTime := time.Now()
386					migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
387					err = migrator.Up(nil, nil)
388					Expect(err).To(HaveOccurred())
389
390					var (
391						version   int
392						isDirty   bool
393						timeStamp pq.NullTime
394						status    string
395						direction string
396					)
397					err = db.QueryRow("SELECT * from migrations_history ORDER BY tstamp DESC").Scan(&version, &timeStamp, &direction, &status, &isDirty)
398					Expect(version).To(Equal(50000))
399					Expect(isDirty).To(BeTrue())
400					Expect(timeStamp.Time.After(startTime)).To(Equal(true))
401					Expect(direction).To(Equal("up"))
402					Expect(status).To(Equal("failed"))
403				})
404			})
405
406			It("Doesn't fail if there are no migrations to run", func() {
407				bindata.AssetNamesReturns([]string{
408					"1510262030_initial_schema.up.sql",
409				})
410
411				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
412				err := migrator.Up(nil, nil)
413				Expect(err).NotTo(HaveOccurred())
414
415				err = migrator.Up(nil, nil)
416				Expect(err).NotTo(HaveOccurred())
417
418				ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion)
419
420				ExpectMigrationVersionTableNotToExist(db)
421
422				ExpectToBeAbleToInsertData(db)
423			})
424
425			It("Locks the database so multiple ATCs don't all run migrations at the same time", func() {
426				SetupMigrationsHistoryTableToExistAtVersion(db, 1510262030)
427
428				SetupSchemaFromFile(db, "migrations/1510262030_initial_schema.up.sql")
429
430				bindata.AssetNamesReturns([]string{
431					"1510262030_initial_schema.up.sql",
432				})
433				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
434
435				var wg sync.WaitGroup
436				wg.Add(3)
437
438				go TryRunUpAndVerifyResult(db, migrator, &wg)
439				go TryRunUpAndVerifyResult(db, migrator, &wg)
440				go TryRunUpAndVerifyResult(db, migrator, &wg)
441
442				wg.Wait()
443			})
444		})
445
446		Context("golang migrations", func() {
447			It("runs a migration with Migrate", func() {
448
449				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
450				bindata.AssetNamesReturns([]string{
451					"1510262030_initial_schema.up.sql",
452					"1516643303_update_auth_providers.up.go",
453				})
454
455				By("applying the initial migration")
456				err := migrator.Migrate(nil, nil, 1510262030)
457				var columnExists string
458				err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists)
459				Expect(err).NotTo(HaveOccurred())
460				Expect(columnExists).To(Equal("true"))
461
462				err = migrator.Migrate(nil, nil, 1516643303)
463				Expect(err).NotTo(HaveOccurred())
464
465				By("applying the go migration")
466				err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists)
467				Expect(err).NotTo(HaveOccurred())
468				Expect(columnExists).To(Equal("false"))
469
470				By("updating the schema migrations table")
471				ExpectDatabaseMigrationVersionToEqual(migrator, 1516643303)
472			})
473
474			It("runs a migration with Up", func() {
475
476				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
477				bindata.AssetNamesReturns([]string{
478					"1510262030_initial_schema.up.sql",
479					"1516643303_update_auth_providers.up.go",
480				})
481
482				err := migrator.Up(nil, nil)
483				Expect(err).NotTo(HaveOccurred())
484
485				By("applying the migration")
486				var columnExists string
487				err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.columns where table_name = 'teams' AND column_name='basic_auth')").Scan(&columnExists)
488				Expect(err).NotTo(HaveOccurred())
489				Expect(columnExists).To(Equal("false"))
490
491				By("updating the schema migrations table")
492				ExpectDatabaseMigrationVersionToEqual(migrator, 1516643303)
493			})
494		})
495	})
496
497	Context("Downgrade", func() {
498		Context("Downgrades to a version that uses the old mattes/migrate schema_migrations table", func() {
499			It("Downgrades to a given version and write it to a new created schema_migrations table", func() {
500				bindata.AssetNamesReturns([]string{
501					"1510262030_initial_schema.up.sql",
502					"1510670987_update_unique_constraint_for_resource_caches.up.sql",
503					"1510670987_update_unique_constraint_for_resource_caches.down.sql",
504				})
505				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
506
507				err := migrator.Up(nil, nil)
508				Expect(err).NotTo(HaveOccurred())
509
510				currentVersion, err := migrator.CurrentVersion()
511				Expect(err).NotTo(HaveOccurred())
512				Expect(currentVersion).To(Equal(upgradedSchemaVersion))
513
514				err = migrator.Migrate(nil, nil, initialSchemaVersion)
515				Expect(err).NotTo(HaveOccurred())
516
517				currentVersion, err = migrator.CurrentVersion()
518				Expect(err).NotTo(HaveOccurred())
519				Expect(currentVersion).To(Equal(initialSchemaVersion))
520
521				ExpectDatabaseVersionToEqual(db, initialSchemaVersion, "schema_migrations")
522
523				ExpectToBeAbleToInsertData(db)
524			})
525
526			It("Downgrades to a given version and write it to the existing schema_migrations table with dirty true", func() {
527
528				bindata.AssetNamesReturns([]string{
529					"1510262030_initial_schema.up.sql",
530					"1510670987_update_unique_constraint_for_resource_caches.up.sql",
531					"1510670987_update_unique_constraint_for_resource_caches.down.sql",
532				})
533				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
534
535				err := migrator.Up(nil, nil)
536				Expect(err).NotTo(HaveOccurred())
537
538				currentVersion, err := migrator.CurrentVersion()
539				Expect(err).NotTo(HaveOccurred())
540				Expect(currentVersion).To(Equal(upgradedSchemaVersion))
541
542				SetupSchemaMigrationsTable(db, 8878, true)
543
544				err = migrator.Migrate(nil, nil, initialSchemaVersion)
545				Expect(err).NotTo(HaveOccurred())
546
547				currentVersion, err = migrator.CurrentVersion()
548				Expect(err).NotTo(HaveOccurred())
549				Expect(currentVersion).To(Equal(initialSchemaVersion))
550
551				ExpectDatabaseVersionToEqual(db, initialSchemaVersion, "schema_migrations")
552
553				ExpectToBeAbleToInsertData(db)
554			})
555		})
556
557		Context("Downgrades to a version with new migrations_history table", func() {
558			It("Downgrades to a given version", func() {
559				bindata.AssetNamesReturns([]string{
560					"1510262030_initial_schema.up.sql",
561					"1510670987_update_unique_constraint_for_resource_caches.up.sql",
562					"1510670987_update_unique_constraint_for_resource_caches.down.sql",
563				})
564				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
565
566				err := migrator.Up(nil, nil)
567				Expect(err).NotTo(HaveOccurred())
568
569				currentVersion, err := migrator.CurrentVersion()
570				Expect(err).NotTo(HaveOccurred())
571				Expect(currentVersion).To(Equal(upgradedSchemaVersion))
572
573				err = migrator.Migrate(nil, nil, initialSchemaVersion)
574				Expect(err).NotTo(HaveOccurred())
575
576				currentVersion, err = migrator.CurrentVersion()
577				Expect(err).NotTo(HaveOccurred())
578				Expect(currentVersion).To(Equal(initialSchemaVersion))
579
580				ExpectToBeAbleToInsertData(db)
581			})
582
583			It("Doesn't fail if already at the requested version", func() {
584				bindata.AssetNamesReturns([]string{
585					"1510262030_initial_schema.up.sql",
586					"1510670987_update_unique_constraint_for_resource_caches.up.sql",
587				})
588				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
589
590				err := migrator.Migrate(nil, nil, upgradedSchemaVersion)
591				Expect(err).NotTo(HaveOccurred())
592
593				currentVersion, err := migrator.CurrentVersion()
594				Expect(err).NotTo(HaveOccurred())
595				Expect(currentVersion).To(Equal(upgradedSchemaVersion))
596
597				err = migrator.Migrate(nil, nil, upgradedSchemaVersion)
598				Expect(err).NotTo(HaveOccurred())
599
600				currentVersion, err = migrator.CurrentVersion()
601				Expect(err).NotTo(HaveOccurred())
602				Expect(currentVersion).To(Equal(upgradedSchemaVersion))
603
604				ExpectToBeAbleToInsertData(db)
605			})
606
607			It("Locks the database so multiple consumers don't run downgrade at the same time", func() {
608				migrator := migration.NewMigratorForMigrations(db, lockFactory, bindata)
609				bindata.AssetNamesReturns([]string{
610					"1510262030_initial_schema.up.sql",
611					"1510670987_update_unique_constraint_for_resource_caches.up.sql",
612					"1510670987_update_unique_constraint_for_resource_caches.down.sql",
613				})
614
615				err := migrator.Up(nil, nil)
616				Expect(err).NotTo(HaveOccurred())
617
618				var wg sync.WaitGroup
619				wg.Add(3)
620
621				go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg)
622				go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg)
623				go TryRunMigrateAndVerifyResult(db, migrator, initialSchemaVersion, &wg)
624
625				wg.Wait()
626			})
627		})
628	})
629
630})
631
632func TryRunUpAndVerifyResult(db *sql.DB, migrator migration.Migrator, wg *sync.WaitGroup) {
633	defer GinkgoRecover()
634	defer wg.Done()
635
636	err := migrator.Up(nil, nil)
637	Expect(err).NotTo(HaveOccurred())
638
639	ExpectDatabaseMigrationVersionToEqual(migrator, initialSchemaVersion)
640
641	ExpectToBeAbleToInsertData(db)
642}
643
644func TryRunMigrateAndVerifyResult(db *sql.DB, migrator migration.Migrator, version int, wg *sync.WaitGroup) {
645	defer GinkgoRecover()
646	defer wg.Done()
647
648	err := migrator.Migrate(nil, nil, version)
649	Expect(err).NotTo(HaveOccurred())
650
651	ExpectDatabaseMigrationVersionToEqual(migrator, version)
652
653	ExpectToBeAbleToInsertData(db)
654}
655
656func SetupMigrationsHistoryTableToExistAtVersion(db *sql.DB, version int) {
657	_, err := db.Exec(`CREATE TABLE migrations_history(version bigint, tstamp timestamp with time zone, direction varchar, status varchar, dirty boolean)`)
658	Expect(err).NotTo(HaveOccurred())
659
660	_, err = db.Exec(`INSERT INTO migrations_history(version, tstamp, direction, status, dirty) VALUES($1, current_timestamp, 'up', 'passed', false)`, version)
661	Expect(err).NotTo(HaveOccurred())
662}
663
664func SetupSchemaMigrationsTable(db *sql.DB, version int, dirty bool) {
665	_, err := db.Exec("CREATE TABLE IF NOT EXISTS schema_migrations (version bigint, dirty boolean)")
666	Expect(err).NotTo(HaveOccurred())
667	_, err = db.Exec("INSERT INTO schema_migrations (version, dirty) VALUES ($1, $2)", version, dirty)
668	Expect(err).NotTo(HaveOccurred())
669}
670
671func SetupSchemaFromFile(db *sql.DB, path string) {
672	migrations, err := ioutil.ReadFile(path)
673	Expect(err).NotTo(HaveOccurred())
674
675	for _, migration := range strings.Split(string(migrations), ";") {
676		_, err = db.Exec(migration)
677		Expect(err).NotTo(HaveOccurred())
678	}
679}
680
681func ExpectDatabaseMigrationVersionToEqual(migrator migration.Migrator, expectedVersion int) {
682	var dbVersion int
683	dbVersion, err := migrator.CurrentVersion()
684	Expect(err).NotTo(HaveOccurred())
685	Expect(dbVersion).To(Equal(expectedVersion))
686}
687
688func ExpectToBeAbleToInsertData(dbConn *sql.DB) {
689	rand.Seed(time.Now().UnixNano())
690
691	teamID := rand.Intn(10000)
692	_, err := dbConn.Exec("INSERT INTO teams(id, name) VALUES ($1, $2)", teamID, strconv.Itoa(teamID))
693	Expect(err).NotTo(HaveOccurred())
694
695	pipelineID := rand.Intn(10000)
696	_, err = dbConn.Exec("INSERT INTO pipelines(id, team_id, name) VALUES ($1, $2, $3)", pipelineID, teamID, strconv.Itoa(pipelineID))
697	Expect(err).NotTo(HaveOccurred())
698
699	jobID := rand.Intn(10000)
700	_, err = dbConn.Exec("INSERT INTO jobs(id, pipeline_id, name, config) VALUES ($1, $2, $3, '{}')", jobID, pipelineID, strconv.Itoa(jobID))
701	Expect(err).NotTo(HaveOccurred())
702}
703
704func ExpectMigrationToHaveFailed(dbConn *sql.DB, failedVersion int, expectDirty bool) {
705	var status string
706	var dirty bool
707	err := dbConn.QueryRow("SELECT status, dirty FROM migrations_history WHERE version=$1 ORDER BY tstamp desc LIMIT 1", failedVersion).Scan(&status, &dirty)
708	Expect(err).NotTo(HaveOccurred())
709	Expect(status).To(Equal("failed"))
710	Expect(dirty).To(Equal(expectDirty))
711}
712