1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package upgrade_tests 6 7import ( 8 "bytes" 9 "context" 10 "flag" 11 "fmt" 12 "io" 13 "io/ioutil" 14 "net" 15 "net/http" 16 "os" 17 "os/exec" 18 "reflect" 19 "testing" 20 21 "github.com/blang/semver/v4" 22 "github.com/docker/go-connections/nat" 23 "github.com/gogo/protobuf/proto" 24 "github.com/golang/snappy" 25 "github.com/jackc/pgx/v4" 26 "github.com/jackc/pgx/v4/pgxpool" 27 _ "github.com/jackc/pgx/v4/stdlib" 28 "github.com/testcontainers/testcontainers-go" 29 "github.com/timescale/promscale/pkg/internal/testhelpers" 30 "github.com/timescale/promscale/pkg/log" 31 "github.com/timescale/promscale/pkg/pgmodel" 32 "github.com/timescale/promscale/pkg/pgmodel/common/extension" 33 "github.com/timescale/promscale/pkg/pgmodel/ingestor" 34 "github.com/timescale/promscale/pkg/pgmodel/model" 35 "github.com/timescale/promscale/pkg/pgxconn" 36 "github.com/timescale/promscale/pkg/prompb" 37 "github.com/timescale/promscale/pkg/runner" 38 tput "github.com/timescale/promscale/pkg/util/throughput" 39 "github.com/timescale/promscale/pkg/version" 40) 41 42var ( 43 testDatabase = flag.String("database", "tmp_db_timescale_upgrade_test", "database to run integration tests on") 44 useExtension = flag.Bool("use-extension", true, "use the promscale extension") 45 printLogs = flag.Bool("print-logs", false, "print TimescaleDB logs") 46 baseExtensionState testhelpers.ExtensionState 47) 48 49func init() { 50 tput.InitWatcher(0) 51} 52 53func TestMain(m *testing.M) { 54 var code int 55 flag.Parse() 56 baseExtensionState.UseTimescaleDB() 57 baseExtensionState.UseTimescaleDB2() 58 baseExtensionState.UsePG12() 59 if *useExtension { 60 baseExtensionState.UsePromscale() 61 } 62 _ = log.Init(log.Config{ 63 Level: "debug", 64 }) 65 code = m.Run() 66 os.Exit(code) 67} 68 69/* Prev image is the db image with the old promscale extension. We do NOT test timescaleDB extension upgrades here. */ 70func getDBImages(extensionState testhelpers.ExtensionState) (prev string, clean string) { 71 if !extensionState.UsesPG12() { 72 //TODO add tests after release with PG13 support 73 panic("Can't test PG13 yet because haven't had a PG13 release") 74 } 75 switch { 76 case extensionState.UsesMultinode(): 77 return "timescaledev/promscale-extension:0.1.1-ts2-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts2-pg12" 78 case !extensionState.UsesTimescaleDB(): 79 return "timescale/timescaledb:latest-pg12", "timescale/timescaledb:latest-pg12" 80 case extensionState.UsesTimescale2(): 81 return "timescaledev/promscale-extension:0.1.1-ts2-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts2-pg12" 82 default: 83 return "timescaledev/promscale-extension:0.1.1-ts1-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts1-pg12" 84 } 85} 86 87func TestUpgradeFromPrev(t *testing.T) { 88 upgradedDbInfo := getUpgradedDbInfo(t, false, false, baseExtensionState) 89 pristineDbInfo := getPristineDbInfo(t, false, baseExtensionState) 90 91 if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) { 92 PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo) 93 } 94} 95 96func TestUpgradeFromEarliest(t *testing.T) { 97 upgradedDbInfo := getUpgradedDbInfo(t, false, true, baseExtensionState) 98 pristineDbInfo := getPristineDbInfo(t, false, baseExtensionState) 99 100 if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) { 101 PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo) 102 } 103} 104 105func TestUpgradeFromEarliestMultinode(t *testing.T) { 106 extState := baseExtensionState 107 extState.UseMultinode() 108 upgradedDbInfo := getUpgradedDbInfo(t, false, true, extState) 109 pristineDbInfo := getPristineDbInfo(t, false, extState) 110 111 if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) { 112 PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo) 113 } 114} 115 116// TestUpgradeFromPrevNoData tests migrations with no ingested data. 117// See issue: https://github.com/timescale/promscale/issues/330 118func TestUpgradeFromEarliestNoData(t *testing.T) { 119 upgradedDbInfo := getUpgradedDbInfo(t, true, true, baseExtensionState) 120 pristineDbInfo := getPristineDbInfo(t, true, baseExtensionState) 121 122 if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) { 123 PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo) 124 } 125} 126 127func getUpgradedDbInfo(t *testing.T, noData bool, useEarliest bool, extensionState testhelpers.ExtensionState) (upgradedDbInfo dbSnapshot) { 128 // We test that upgrading from both the earliest and the directly-previous versions works 129 // While it may seem that the earliest version is sufficient, idempotent scripts are only 130 // run on each completed updated and so testing the upgrade as it relates to the last idempotent 131 // state is important. To see why we need both tests, think of the following example: 132 // 133 // Say you have an earliest version 1 and a new version 3. In version 2 you introduce procedure foo(). that you 134 // drop in version 3. 135 // DROP FUNCTION IF EXISTS foo() (wrong since foo is procedure not function), would pass the earlier->latest test since 136 // version 1 has no function foo, and would only be caught in prev->latest test. 137 // DROP PROCEDURE foo() (wrong since missing IF NOT EXISTS), would pass the prev->latest test but would be caught in the 138 // earliest->latest test. 139 prevVersion := semver.MustParse(version.PrevReleaseVersion) 140 if useEarliest { 141 prevVersion = semver.MustParse(version.EarliestUpgradeTestVersion) 142 if extensionState.UsesMultinode() || extensionState.UsesTimescale2() { 143 prevVersion = semver.MustParse(version.EarliestUpgradeTestVersionMultinode) 144 } 145 } 146 // TODO we could probably improve performance of this test by 2x if we 147 // gathered the db info in parallel. Unfortunately our db runner doesn't 148 // support this yet 149 withDBStartingAtOldVersionAndUpgrading(t, *testDatabase, prevVersion, extensionState, 150 /* preUpgrade */ 151 func(dbContainer testcontainers.Container, dbTmpDir string, connectorHost string, connectorPort nat.Port) { 152 if noData { 153 return 154 } 155 client := http.Client{} 156 defer client.CloseIdleConnections() 157 158 writeUrl := fmt.Sprintf("http://%s/write", net.JoinHostPort(connectorHost, connectorPort.Port())) 159 160 doWrite(t, &client, writeUrl, preUpgradeData1, preUpgradeData2) 161 }, 162 /* postUpgrade */ 163 func(dbContainer testcontainers.Container, dbTmpDir string) { 164 connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser) 165 166 db, err := pgxpool.Connect(context.Background(), connectURL) 167 if err != nil { 168 t.Fatal(err) 169 } 170 defer db.Close() 171 172 if !noData { 173 ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) 174 if err != nil { 175 t.Fatalf("error connecting to DB: %v", err) 176 } 177 178 doIngest(t, ing, postUpgradeData1, postUpgradeData2) 179 180 ing.Close() 181 182 } 183 upgradedDbInfo = SnapshotDB(t, dbContainer, *testDatabase, dbTmpDir, db, extensionState) 184 }) 185 return 186} 187 188func getPristineDbInfo(t *testing.T, noData bool, extensionState testhelpers.ExtensionState) (pristineDbInfo dbSnapshot) { 189 withNewDBAtCurrentVersion(t, *testDatabase, extensionState, 190 /* preRestart */ 191 func(container testcontainers.Container, _ string, db *pgxpool.Pool, tmpDir string) { 192 if noData { 193 return 194 } 195 ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) 196 if err != nil { 197 t.Fatalf("error connecting to DB: %v", err) 198 } 199 defer ing.Close() 200 201 doIngest(t, ing, preUpgradeData1, preUpgradeData2) 202 }, 203 /* postRestart */ 204 func(container testcontainers.Container, _ string, db *pgxpool.Pool, tmpDir string) { 205 if !noData { 206 ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) 207 if err != nil { 208 t.Fatalf("error connecting to DB: %v", err) 209 } 210 defer ing.Close() 211 212 doIngest(t, ing, postUpgradeData1, postUpgradeData2) 213 } 214 pristineDbInfo = SnapshotDB(t, container, *testDatabase, tmpDir, db, extensionState) 215 }) 216 return 217} 218 219// pick a start time in the future so data won't get compressed 220const startTime = 6600000000000 // approx 210 years after the epoch 221var ( 222 preUpgradeData1 = []prompb.TimeSeries{ 223 { 224 Labels: []prompb.Label{ 225 {Name: model.MetricNameLabelName, Value: "test"}, 226 {Name: "test", Value: "test"}, 227 }, 228 Samples: []prompb.Sample{ 229 {Timestamp: startTime + 1, Value: 0.1}, 230 {Timestamp: startTime + 2, Value: 0.2}, 231 }, 232 }, 233 } 234 preUpgradeData2 = []prompb.TimeSeries{ 235 { 236 Labels: []prompb.Label{ 237 {Name: model.MetricNameLabelName, Value: "test2"}, 238 {Name: "foo", Value: "bar"}, 239 }, 240 Samples: []prompb.Sample{ 241 {Timestamp: startTime + 4, Value: 2.2}, 242 }, 243 }, 244 } 245 246 postUpgradeData1 = []prompb.TimeSeries{ 247 { 248 Labels: []prompb.Label{ 249 {Name: model.MetricNameLabelName, Value: "test"}, 250 {Name: "testB", Value: "testB"}, 251 }, 252 Samples: []prompb.Sample{ 253 {Timestamp: startTime + 4, Value: 0.4}, 254 {Timestamp: startTime + 5, Value: 0.5}, 255 }, 256 }, 257 } 258 postUpgradeData2 = []prompb.TimeSeries{ 259 { 260 Labels: []prompb.Label{ 261 {Name: model.MetricNameLabelName, Value: "test3"}, 262 {Name: "baz", Value: "quf"}, 263 }, 264 Samples: []prompb.Sample{ 265 {Timestamp: startTime + 66, Value: 6.0}, 266 }, 267 }, 268 } 269) 270 271func addNode2(t testing.TB, DBName string) { 272 db, err := pgx.Connect(context.Background(), testhelpers.PgConnectURL(DBName, testhelpers.Superuser)) 273 if err != nil { 274 t.Fatal(err) 275 } 276 err = testhelpers.AddDataNode2(db, DBName) 277 if err != nil { 278 t.Fatal(err) 279 } 280 if err = db.Close(context.Background()); err != nil { 281 t.Fatal(err) 282 } 283 284 //do this as prom user 285 dbProm, err := pgx.Connect(context.Background(), testhelpers.PgConnectURL(DBName, testhelpers.NoSuperuser)) 286 if err != nil { 287 t.Fatal(err) 288 } 289 _, err = dbProm.Exec(context.Background(), "CALL add_prom_node('dn1');") 290 if err != nil { 291 t.Fatal(err) 292 } 293 if err = db.Close(context.Background()); err != nil { 294 t.Fatal(err) 295 } 296} 297 298// Start a db with the prev extra extension and a prev connector as well. 299// This ensures that we test upgrades of both the extension and the connector schema. 300// Then run preUpgrade and shut everything down. 301// Start a new db with the latest extra extension and migrate to the latest version of the connector schema. 302// Then run postUpgrade. 303func withDBStartingAtOldVersionAndUpgrading( 304 t testing.TB, 305 DBName string, 306 prevVersion semver.Version, 307 extensionState testhelpers.ExtensionState, 308 preUpgrade func(dbContainer testcontainers.Container, dbTmpDir string, connectorHost string, connectorPort nat.Port), 309 postUpgrade func(dbContainer testcontainers.Container, dbTmpDir string)) { 310 var err error 311 ctx := context.Background() 312 313 tmpDir, err := testhelpers.TempDir("update_test_out") 314 if err != nil { 315 log.Fatal(err) 316 } 317 318 dataDir, err := testhelpers.TempDir("update_test_data") 319 if err != nil { 320 log.Fatal(err) 321 } 322 323 prevDBImage, cleanImage := getDBImages(extensionState) 324 // Start a db with the prev extension and a prev connector as well 325 // Then run preUpgrade and shut everything down. 326 func() { 327 dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, prevDBImage, tmpDir, dataDir, *printLogs, extensionState) 328 if err != nil { 329 t.Fatal("Error setting up container", err) 330 } 331 332 defer func() { _ = closer.Close() }() 333 334 db, err := testhelpers.DbSetup(*testDatabase, testhelpers.NoSuperuser, true, extensionState) 335 if err != nil { 336 t.Fatal(err) 337 return 338 } 339 db.Close() 340 341 connectorImage := "timescale/promscale:" + prevVersion.String() 342 connector, err := testhelpers.StartConnectorWithImage(context.Background(), dbContainer, connectorImage, *printLogs, []string{}, *testDatabase) 343 if err != nil { 344 log.Fatal(err.Error()) 345 } 346 defer testhelpers.StopContainer(ctx, connector, *printLogs) 347 348 connectorHost, err := connector.Host(ctx) 349 if err != nil { 350 t.Fatal(err) 351 return 352 } 353 354 connectorPort, err := connector.MappedPort(ctx, testhelpers.ConnectorPort) 355 if err != nil { 356 t.Fatal(err) 357 return 358 } 359 t.Logf("Running preUpgrade with old version of connector and db: connector=%v db=%v", connectorImage, prevDBImage) 360 preUpgrade(dbContainer, tmpDir, connectorHost, connectorPort) 361 }() 362 363 //Start a new connector and migrate. 364 //Then run postUpgrade 365 dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState) 366 if err != nil { 367 t.Fatal("Error setting up container", err) 368 } 369 370 defer func() { _ = closer.Close() }() 371 372 t.Logf("upgrading versions %v => %v", prevVersion, version.Promscale) 373 connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser) 374 migrateToVersion(t, connectURL, version.Promscale, "azxtestcommit") 375 376 if extensionState.UsesMultinode() { 377 //add a node after upgrade; this tests strictly more functionality since we already have one node set up before 378 addNode2(t, *testDatabase) 379 } 380 t.Log("Running postUpgrade") 381 postUpgrade(dbContainer, tmpDir) 382 383} 384 385// Run a DB and connector at the current version. Run preRestart then restart the db 386// then run postRestart. A restart is necessary because we need a restart in the 387// upgrade path to change the extension that is available. But, a restart causes 388// Sequences to skip values. So, in order to have equivalent data, we need to make 389// sure that both the upgrade and this pristine path both have restarts. 390func withNewDBAtCurrentVersion(t testing.TB, DBName string, extensionState testhelpers.ExtensionState, 391 preRestart func(container testcontainers.Container, connectURL string, db *pgxpool.Pool, tmpDir string), 392 postRestart func(container testcontainers.Container, connectURL string, db *pgxpool.Pool, tmpDir string)) { 393 var err error 394 ctx := context.Background() 395 396 tmpDir, err := testhelpers.TempDir("update_test_out") 397 if err != nil { 398 log.Fatal(err) 399 } 400 dataDir, err := testhelpers.TempDir("update_test_data") 401 if err != nil { 402 log.Fatal(err) 403 } 404 405 _, cleanImage := getDBImages(extensionState) 406 407 func() { 408 container, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState) 409 if err != nil { 410 fmt.Println("Error setting up container", err) 411 os.Exit(1) 412 } 413 414 defer func() { _ = closer.Close() }() 415 testhelpers.WithDB(t, DBName, testhelpers.NoSuperuser, true, extensionState, func(_ *pgxpool.Pool, t testing.TB, connectURL string) { 416 migrateToVersion(t, connectURL, version.Promscale, "azxtestcommit") 417 418 // need to get a new pool after the Migrate to catch any GUC changes made during Migrate 419 db, err := pgxpool.Connect(context.Background(), connectURL) 420 if err != nil { 421 t.Fatal(err) 422 } 423 defer db.Close() 424 preRestart(container, connectURL, db, tmpDir) 425 }) 426 }() 427 container, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState) 428 if err != nil { 429 fmt.Println("Error setting up container", err) 430 os.Exit(1) 431 } 432 433 if extensionState.UsesMultinode() { 434 addNode2(t, *testDatabase) 435 } 436 defer func() { _ = closer.Close() }() 437 connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser) 438 db, err := pgxpool.Connect(context.Background(), connectURL) 439 if err != nil { 440 t.Fatal(err) 441 } 442 defer db.Close() 443 postRestart(container, connectURL, db, tmpDir) 444} 445 446func migrateToVersion(t testing.TB, connectURL string, version string, commitHash string) { 447 err := extension.InstallUpgradeTimescaleDBExtensions(connectURL, extension.ExtensionMigrateOptions{Install: true, Upgrade: true, UpgradePreRelease: true}) 448 if err != nil { 449 t.Fatal(err) 450 } 451 452 migratePool, err := pgx.Connect(context.Background(), connectURL) 453 if err != nil { 454 t.Fatal(err) 455 } 456 defer func() { _ = migratePool.Close(context.Background()) }() 457 err = runner.SetupDBState(migratePool, pgmodel.VersionInfo{Version: version, CommitHash: commitHash}, nil, extension.ExtensionMigrateOptions{Install: true, Upgrade: true, UpgradePreRelease: true}) 458 if err != nil { 459 t.Fatal(err) 460 } 461} 462 463func tsWriteReq(ts []prompb.TimeSeries) prompb.WriteRequest { 464 return prompb.WriteRequest{ 465 Timeseries: ts, 466 } 467} 468 469func writeReqToHttp(r prompb.WriteRequest) *bytes.Reader { 470 data, _ := proto.Marshal(&r) 471 body := snappy.Encode(nil, data) 472 return bytes.NewReader(body) 473} 474 475func doWrite(t *testing.T, client *http.Client, url string, data ...[]prompb.TimeSeries) { 476 for _, data := range data { 477 body := writeReqToHttp(tsWriteReq(copyMetrics(data))) 478 req, err := http.NewRequest("POST", url, body) 479 if err != nil { 480 t.Errorf("Error creating request: %v", err) 481 } 482 req.Header.Add("Content-Encoding", "snappy") 483 req.Header.Set("Content-Type", "application/x-protobuf") 484 req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") 485 resp, err := client.Do(req) 486 if err != nil { 487 t.Fatal(err) 488 } 489 if resp.StatusCode != 200 { 490 t.Fatal("non-ok status:", resp.Status) 491 } 492 493 _, _ = io.Copy(ioutil.Discard, resp.Body) 494 resp.Body.Close() 495 } 496} 497 498func doIngest(t *testing.T, ingstr *ingestor.DBIngestor, data ...[]prompb.TimeSeries) { 499 for _, data := range data { 500 wr := ingestor.NewWriteRequest() 501 wr.Timeseries = copyMetrics(data) 502 _, _, err := ingstr.Ingest(wr) 503 if err != nil { 504 t.Fatalf("ingest error: %v", err) 505 } 506 _ = ingstr.CompleteMetricCreation() 507 } 508} 509 510// deep copy the metrics since we mutate them, and don't want to invalidate the tests 511func copyMetrics(metrics []prompb.TimeSeries) []prompb.TimeSeries { 512 out := make([]prompb.TimeSeries, len(metrics)) 513 copy(out, metrics) 514 for i := range out { 515 out[i].Labels = make([]prompb.Label, len(metrics[i].Labels)) 516 out[i].Samples = make([]prompb.Sample, len(metrics[i].Samples)) 517 copy(out[i].Labels, metrics[i].Labels) 518 copy(out[i].Samples, metrics[i].Samples) 519 } 520 return out 521} 522 523func TestExtensionUpgrade(t *testing.T) { 524 var err error 525 var version string 526 527 if true { 528 t.Skip("Temporarily disabled test") 529 } 530 531 ctx := context.Background() 532 533 buildPromscaleImageFromRepo(t) 534 _, dbContainer, closer := startDB(t, ctx) 535 defer closer.Close() 536 537 // as the default installed version ext is rc4 in the test image downgrade it to rc2 538 // to test upgrade flow. 539 extVersion := "2.0.0-rc2" 540 dropAndCreateExt(t, ctx, extVersion) 541 542 db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 543 if err != nil { 544 t.Fatal(err) 545 } 546 547 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version) 548 if err != nil { 549 t.Fatal(err) 550 } 551 552 if version != extVersion { 553 t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false") 554 } 555 556 // start promscale & test upgrade-prerelease-extensions as false 557 // Now the ext is rc2 it should be rc2 after promscale startup too 558 func() { 559 connectorImage := "timescale/promscale:latest" 560 databaseName := "postgres" 561 connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName) 562 if err != nil { 563 t.Fatal(err) 564 } 565 defer testhelpers.StopContainer(ctx, connector, *printLogs) 566 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version) 567 if err != nil { 568 t.Fatal(err) 569 } 570 571 if version != extVersion { 572 t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false") 573 } 574 t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions false") 575 }() 576 577 db.Close(ctx) 578 579 // start a new connector and test --upgrade-prerelease-extensions as true 580 // the default installed ext version is rc2 now it should upgrade it to rc4 581 func() { 582 connectorImage := "timescale/promscale:latest" 583 databaseName := "postgres" 584 flags := []string{"-upgrade-prerelease-extensions", "true"} 585 connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, flags, databaseName) 586 if err != nil { 587 t.Fatal(err) 588 } 589 defer testhelpers.StopContainer(ctx, connector, *printLogs) 590 591 var versionStr string 592 db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 593 if err != nil { 594 t.Fatal(err) 595 } 596 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&versionStr) 597 if err != nil { 598 t.Fatal(err) 599 } 600 601 db.Close(ctx) 602 603 if versionStr != "2.0.0-rc4" { 604 t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension true") 605 } 606 t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions true") 607 }() 608} 609 610func TestMigrationFailure(t *testing.T) { 611 var err error 612 var version string 613 614 if true { 615 t.Skip("Temporarily disabled test") 616 } 617 ctx := context.Background() 618 619 buildPromscaleImageFromRepo(t) 620 db, dbContainer, closer := startDB(t, ctx) 621 defer closer.Close() 622 623 // start promscale & test upgrade-extensions as false 624 func() { 625 connectorImage := "timescale/promscale:latest" 626 databaseName := "postgres" 627 connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName) 628 if err != nil { 629 t.Fatal(err) 630 } 631 defer testhelpers.StopContainer(ctx, connector, *printLogs) 632 633 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version) 634 if err != nil { 635 t.Fatal(err) 636 } 637 638 db.Close(ctx) 639 if version != "2.0.0-rc4" { 640 t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false") 641 } 642 t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions false.") 643 }() 644 645 // As the timescaleDB installed version is rc4, lets install the 1.7.3 ext version 646 extVersion := "1.7.3" 647 dropAndCreateExt(t, ctx, extVersion) 648 649 db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 650 if err != nil { 651 t.Fatal(err) 652 } 653 654 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version) 655 if err != nil { 656 t.Fatal(err) 657 } 658 659 db.Close(ctx) 660 if version != extVersion { 661 t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false") 662 } 663 664 // start a new connector and test --upgrade-extensions as true which is by default set in flags 665 // the migration should fail (upgrade path in tsdb isn't available) but promscale should be running. 666 func() { 667 connectorImage := "timescale/promscale:latest" 668 databaseName := "postgres" 669 connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName) 670 if err != nil { 671 t.Fatal(err) 672 } 673 defer testhelpers.StopContainer(ctx, connector, *printLogs) 674 675 var version string 676 db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 677 if err != nil { 678 t.Fatal(err) 679 } 680 err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version) 681 if err != nil { 682 t.Fatal(err) 683 } 684 db.Close(ctx) 685 686 if version != "1.7.3" { 687 t.Fatal("failed to verify timescaleDB extension version") 688 } 689 690 // Now from the check we are know that migration failed from 1.7.3 to 1.7.4 691 // as the upgrade script doesn't exist within timescaleDB image. 692 // Now check promscale is still running on migration failure. 693 exitCode, err := connector.Exec(context.Background(), []string{"echo", "hello"}) 694 if exitCode != 0 || err != nil { 695 t.Fatal("promscale failed to run extension migration failure", err) 696 } 697 t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions true where migration fails and promscale keeps running.") 698 }() 699} 700 701func buildPromscaleImageFromRepo(t *testing.T) { 702 t.Logf("building promscale image from the codebase") 703 cmd := exec.Command("docker", "build", "-t", "timescale/promscale:latest", "./../../../", "--file", "./../../../build/Dockerfile") 704 err := cmd.Run() 705 if err != nil { 706 t.Fatal(err) 707 } 708 t.Logf("successfully built promscale:latest image from codebase.") 709} 710 711func startDB(t *testing.T, ctx context.Context) (*pgx.Conn, testcontainers.Container, io.Closer) { 712 tmpDir, err := testhelpers.TempDir("update_test_out") 713 if err != nil { 714 t.Fatal(err) 715 } 716 717 dataDir, err := testhelpers.TempDir("update_test_data") 718 if err != nil { 719 t.Fatal(err) 720 } 721 722 dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, "timescaledev/promscale-extension:testing-extension-upgrade", tmpDir, dataDir, *printLogs, testhelpers.Timescale1AndPromscale) 723 if err != nil { 724 t.Fatal("Error setting up container", err) 725 } 726 727 // need to get a new pool after the Migrate to catch any GUC changes made during Migrate 728 db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 729 if err != nil { 730 t.Fatal(err) 731 } 732 733 return db, dbContainer, closer 734} 735 736func dropAndCreateExt(t *testing.T, ctx context.Context, extVersion string) { 737 // Drop existing installed extension & install a lower extension version to test upgrade 738 db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 739 if err != nil { 740 t.Fatal(err) 741 } 742 743 _, err = db.Exec(ctx, `DROP EXTENSION timescaledb`) 744 if err != nil { 745 t.Fatal(err) 746 } 747 db.Close(ctx) 748 749 db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser)) 750 if err != nil { 751 t.Fatal(err) 752 } 753 754 _, err = db.Exec(ctx, fmt.Sprintf(`CREATE EXTENSION timescaledb version '%s'`, extVersion)) 755 if err != nil { 756 t.Fatal(err) 757 } 758 db.Close(ctx) 759} 760