1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package upgrade_tests
6
7import (
8	"bytes"
9	"context"
10	"flag"
11	"fmt"
12	"io"
13	"io/ioutil"
14	"net"
15	"net/http"
16	"os"
17	"os/exec"
18	"reflect"
19	"testing"
20
21	"github.com/blang/semver/v4"
22	"github.com/docker/go-connections/nat"
23	"github.com/gogo/protobuf/proto"
24	"github.com/golang/snappy"
25	"github.com/jackc/pgx/v4"
26	"github.com/jackc/pgx/v4/pgxpool"
27	_ "github.com/jackc/pgx/v4/stdlib"
28	"github.com/testcontainers/testcontainers-go"
29	"github.com/timescale/promscale/pkg/internal/testhelpers"
30	"github.com/timescale/promscale/pkg/log"
31	"github.com/timescale/promscale/pkg/pgmodel"
32	"github.com/timescale/promscale/pkg/pgmodel/common/extension"
33	"github.com/timescale/promscale/pkg/pgmodel/ingestor"
34	"github.com/timescale/promscale/pkg/pgmodel/model"
35	"github.com/timescale/promscale/pkg/pgxconn"
36	"github.com/timescale/promscale/pkg/prompb"
37	"github.com/timescale/promscale/pkg/runner"
38	tput "github.com/timescale/promscale/pkg/util/throughput"
39	"github.com/timescale/promscale/pkg/version"
40)
41
42var (
43	testDatabase       = flag.String("database", "tmp_db_timescale_upgrade_test", "database to run integration tests on")
44	useExtension       = flag.Bool("use-extension", true, "use the promscale extension")
45	printLogs          = flag.Bool("print-logs", false, "print TimescaleDB logs")
46	baseExtensionState testhelpers.ExtensionState
47)
48
49func init() {
50	tput.InitWatcher(0)
51}
52
53func TestMain(m *testing.M) {
54	var code int
55	flag.Parse()
56	baseExtensionState.UseTimescaleDB()
57	baseExtensionState.UseTimescaleDB2()
58	baseExtensionState.UsePG12()
59	if *useExtension {
60		baseExtensionState.UsePromscale()
61	}
62	_ = log.Init(log.Config{
63		Level: "debug",
64	})
65	code = m.Run()
66	os.Exit(code)
67}
68
69/* Prev image is the db image with the old promscale extension. We do NOT test timescaleDB extension upgrades here. */
70func getDBImages(extensionState testhelpers.ExtensionState) (prev string, clean string) {
71	if !extensionState.UsesPG12() {
72		//TODO add tests after release with PG13 support
73		panic("Can't test PG13 yet because haven't had a PG13 release")
74	}
75	switch {
76	case extensionState.UsesMultinode():
77		return "timescaledev/promscale-extension:0.1.1-ts2-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts2-pg12"
78	case !extensionState.UsesTimescaleDB():
79		return "timescale/timescaledb:latest-pg12", "timescale/timescaledb:latest-pg12"
80	case extensionState.UsesTimescale2():
81		return "timescaledev/promscale-extension:0.1.1-ts2-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts2-pg12"
82	default:
83		return "timescaledev/promscale-extension:0.1.1-ts1-pg12", testhelpers.LatestDBWithPromscaleImageBase + ":latest-ts1-pg12"
84	}
85}
86
87func TestUpgradeFromPrev(t *testing.T) {
88	upgradedDbInfo := getUpgradedDbInfo(t, false, false, baseExtensionState)
89	pristineDbInfo := getPristineDbInfo(t, false, baseExtensionState)
90
91	if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) {
92		PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo)
93	}
94}
95
96func TestUpgradeFromEarliest(t *testing.T) {
97	upgradedDbInfo := getUpgradedDbInfo(t, false, true, baseExtensionState)
98	pristineDbInfo := getPristineDbInfo(t, false, baseExtensionState)
99
100	if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) {
101		PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo)
102	}
103}
104
105func TestUpgradeFromEarliestMultinode(t *testing.T) {
106	extState := baseExtensionState
107	extState.UseMultinode()
108	upgradedDbInfo := getUpgradedDbInfo(t, false, true, extState)
109	pristineDbInfo := getPristineDbInfo(t, false, extState)
110
111	if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) {
112		PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo)
113	}
114}
115
116// TestUpgradeFromPrevNoData tests migrations with no ingested data.
117// See issue: https://github.com/timescale/promscale/issues/330
118func TestUpgradeFromEarliestNoData(t *testing.T) {
119	upgradedDbInfo := getUpgradedDbInfo(t, true, true, baseExtensionState)
120	pristineDbInfo := getPristineDbInfo(t, true, baseExtensionState)
121
122	if !reflect.DeepEqual(pristineDbInfo, upgradedDbInfo) {
123		PrintDbSnapshotDifferences(t, pristineDbInfo, upgradedDbInfo)
124	}
125}
126
127func getUpgradedDbInfo(t *testing.T, noData bool, useEarliest bool, extensionState testhelpers.ExtensionState) (upgradedDbInfo dbSnapshot) {
128	// We test that upgrading from both the earliest and the directly-previous versions works
129	// While it may seem that the earliest version is sufficient, idempotent scripts are only
130	// run on each completed updated and so testing the upgrade as it relates to the last idempotent
131	// state is important. To see why we need both tests, think of the following example:
132	//
133	// Say you have an earliest version 1 and a new version 3. In version 2 you introduce procedure foo(). that you
134	// drop in version 3.
135	// DROP FUNCTION IF EXISTS foo() (wrong since foo is procedure not function), would pass the earlier->latest test since
136	// version 1 has no function foo, and would only be caught in prev->latest test.
137	// DROP PROCEDURE foo() (wrong since missing IF NOT EXISTS), would pass the prev->latest test but would be caught in the
138	// earliest->latest test.
139	prevVersion := semver.MustParse(version.PrevReleaseVersion)
140	if useEarliest {
141		prevVersion = semver.MustParse(version.EarliestUpgradeTestVersion)
142		if extensionState.UsesMultinode() || extensionState.UsesTimescale2() {
143			prevVersion = semver.MustParse(version.EarliestUpgradeTestVersionMultinode)
144		}
145	}
146	// TODO we could probably improve performance of this test by 2x if we
147	//      gathered the db info in parallel. Unfortunately our db runner doesn't
148	//      support this yet
149	withDBStartingAtOldVersionAndUpgrading(t, *testDatabase, prevVersion, extensionState,
150		/* preUpgrade */
151		func(dbContainer testcontainers.Container, dbTmpDir string, connectorHost string, connectorPort nat.Port) {
152			if noData {
153				return
154			}
155			client := http.Client{}
156			defer client.CloseIdleConnections()
157
158			writeUrl := fmt.Sprintf("http://%s/write", net.JoinHostPort(connectorHost, connectorPort.Port()))
159
160			doWrite(t, &client, writeUrl, preUpgradeData1, preUpgradeData2)
161		},
162		/* postUpgrade */
163		func(dbContainer testcontainers.Container, dbTmpDir string) {
164			connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser)
165
166			db, err := pgxpool.Connect(context.Background(), connectURL)
167			if err != nil {
168				t.Fatal(err)
169			}
170			defer db.Close()
171
172			if !noData {
173				ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
174				if err != nil {
175					t.Fatalf("error connecting to DB: %v", err)
176				}
177
178				doIngest(t, ing, postUpgradeData1, postUpgradeData2)
179
180				ing.Close()
181
182			}
183			upgradedDbInfo = SnapshotDB(t, dbContainer, *testDatabase, dbTmpDir, db, extensionState)
184		})
185	return
186}
187
188func getPristineDbInfo(t *testing.T, noData bool, extensionState testhelpers.ExtensionState) (pristineDbInfo dbSnapshot) {
189	withNewDBAtCurrentVersion(t, *testDatabase, extensionState,
190		/* preRestart */
191		func(container testcontainers.Container, _ string, db *pgxpool.Pool, tmpDir string) {
192			if noData {
193				return
194			}
195			ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
196			if err != nil {
197				t.Fatalf("error connecting to DB: %v", err)
198			}
199			defer ing.Close()
200
201			doIngest(t, ing, preUpgradeData1, preUpgradeData2)
202		},
203		/* postRestart */
204		func(container testcontainers.Container, _ string, db *pgxpool.Pool, tmpDir string) {
205			if !noData {
206				ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
207				if err != nil {
208					t.Fatalf("error connecting to DB: %v", err)
209				}
210				defer ing.Close()
211
212				doIngest(t, ing, postUpgradeData1, postUpgradeData2)
213			}
214			pristineDbInfo = SnapshotDB(t, container, *testDatabase, tmpDir, db, extensionState)
215		})
216	return
217}
218
219// pick a start time in the future so data won't get compressed
220const startTime = 6600000000000 // approx 210 years after the epoch
221var (
222	preUpgradeData1 = []prompb.TimeSeries{
223		{
224			Labels: []prompb.Label{
225				{Name: model.MetricNameLabelName, Value: "test"},
226				{Name: "test", Value: "test"},
227			},
228			Samples: []prompb.Sample{
229				{Timestamp: startTime + 1, Value: 0.1},
230				{Timestamp: startTime + 2, Value: 0.2},
231			},
232		},
233	}
234	preUpgradeData2 = []prompb.TimeSeries{
235		{
236			Labels: []prompb.Label{
237				{Name: model.MetricNameLabelName, Value: "test2"},
238				{Name: "foo", Value: "bar"},
239			},
240			Samples: []prompb.Sample{
241				{Timestamp: startTime + 4, Value: 2.2},
242			},
243		},
244	}
245
246	postUpgradeData1 = []prompb.TimeSeries{
247		{
248			Labels: []prompb.Label{
249				{Name: model.MetricNameLabelName, Value: "test"},
250				{Name: "testB", Value: "testB"},
251			},
252			Samples: []prompb.Sample{
253				{Timestamp: startTime + 4, Value: 0.4},
254				{Timestamp: startTime + 5, Value: 0.5},
255			},
256		},
257	}
258	postUpgradeData2 = []prompb.TimeSeries{
259		{
260			Labels: []prompb.Label{
261				{Name: model.MetricNameLabelName, Value: "test3"},
262				{Name: "baz", Value: "quf"},
263			},
264			Samples: []prompb.Sample{
265				{Timestamp: startTime + 66, Value: 6.0},
266			},
267		},
268	}
269)
270
271func addNode2(t testing.TB, DBName string) {
272	db, err := pgx.Connect(context.Background(), testhelpers.PgConnectURL(DBName, testhelpers.Superuser))
273	if err != nil {
274		t.Fatal(err)
275	}
276	err = testhelpers.AddDataNode2(db, DBName)
277	if err != nil {
278		t.Fatal(err)
279	}
280	if err = db.Close(context.Background()); err != nil {
281		t.Fatal(err)
282	}
283
284	//do this as prom user
285	dbProm, err := pgx.Connect(context.Background(), testhelpers.PgConnectURL(DBName, testhelpers.NoSuperuser))
286	if err != nil {
287		t.Fatal(err)
288	}
289	_, err = dbProm.Exec(context.Background(), "CALL add_prom_node('dn1');")
290	if err != nil {
291		t.Fatal(err)
292	}
293	if err = db.Close(context.Background()); err != nil {
294		t.Fatal(err)
295	}
296}
297
298// Start a db with the prev extra extension and a prev connector as well.
299// This ensures that we test upgrades of both the extension and the connector schema.
300// Then run preUpgrade and shut everything down.
301// Start a new db with the latest extra extension and migrate to the latest version of the connector schema.
302// Then run postUpgrade.
303func withDBStartingAtOldVersionAndUpgrading(
304	t testing.TB,
305	DBName string,
306	prevVersion semver.Version,
307	extensionState testhelpers.ExtensionState,
308	preUpgrade func(dbContainer testcontainers.Container, dbTmpDir string, connectorHost string, connectorPort nat.Port),
309	postUpgrade func(dbContainer testcontainers.Container, dbTmpDir string)) {
310	var err error
311	ctx := context.Background()
312
313	tmpDir, err := testhelpers.TempDir("update_test_out")
314	if err != nil {
315		log.Fatal(err)
316	}
317
318	dataDir, err := testhelpers.TempDir("update_test_data")
319	if err != nil {
320		log.Fatal(err)
321	}
322
323	prevDBImage, cleanImage := getDBImages(extensionState)
324	// Start a db with the prev extension and a prev connector as well
325	// Then run preUpgrade and shut everything down.
326	func() {
327		dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, prevDBImage, tmpDir, dataDir, *printLogs, extensionState)
328		if err != nil {
329			t.Fatal("Error setting up container", err)
330		}
331
332		defer func() { _ = closer.Close() }()
333
334		db, err := testhelpers.DbSetup(*testDatabase, testhelpers.NoSuperuser, true, extensionState)
335		if err != nil {
336			t.Fatal(err)
337			return
338		}
339		db.Close()
340
341		connectorImage := "timescale/promscale:" + prevVersion.String()
342		connector, err := testhelpers.StartConnectorWithImage(context.Background(), dbContainer, connectorImage, *printLogs, []string{}, *testDatabase)
343		if err != nil {
344			log.Fatal(err.Error())
345		}
346		defer testhelpers.StopContainer(ctx, connector, *printLogs)
347
348		connectorHost, err := connector.Host(ctx)
349		if err != nil {
350			t.Fatal(err)
351			return
352		}
353
354		connectorPort, err := connector.MappedPort(ctx, testhelpers.ConnectorPort)
355		if err != nil {
356			t.Fatal(err)
357			return
358		}
359		t.Logf("Running preUpgrade with old version of connector and db: connector=%v db=%v", connectorImage, prevDBImage)
360		preUpgrade(dbContainer, tmpDir, connectorHost, connectorPort)
361	}()
362
363	//Start a new connector and migrate.
364	//Then run postUpgrade
365	dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState)
366	if err != nil {
367		t.Fatal("Error setting up container", err)
368	}
369
370	defer func() { _ = closer.Close() }()
371
372	t.Logf("upgrading versions %v => %v", prevVersion, version.Promscale)
373	connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser)
374	migrateToVersion(t, connectURL, version.Promscale, "azxtestcommit")
375
376	if extensionState.UsesMultinode() {
377		//add a node after upgrade; this tests strictly more functionality since we already have one node set up before
378		addNode2(t, *testDatabase)
379	}
380	t.Log("Running postUpgrade")
381	postUpgrade(dbContainer, tmpDir)
382
383}
384
385// Run a DB and connector at the current version. Run preRestart then restart the db
386// then run postRestart. A restart is necessary because we need a restart in the
387// upgrade path to change the extension that is available. But, a restart causes
388// Sequences to skip values. So, in order to have equivalent data, we need to make
389// sure that both the upgrade and this pristine path both have restarts.
390func withNewDBAtCurrentVersion(t testing.TB, DBName string, extensionState testhelpers.ExtensionState,
391	preRestart func(container testcontainers.Container, connectURL string, db *pgxpool.Pool, tmpDir string),
392	postRestart func(container testcontainers.Container, connectURL string, db *pgxpool.Pool, tmpDir string)) {
393	var err error
394	ctx := context.Background()
395
396	tmpDir, err := testhelpers.TempDir("update_test_out")
397	if err != nil {
398		log.Fatal(err)
399	}
400	dataDir, err := testhelpers.TempDir("update_test_data")
401	if err != nil {
402		log.Fatal(err)
403	}
404
405	_, cleanImage := getDBImages(extensionState)
406
407	func() {
408		container, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState)
409		if err != nil {
410			fmt.Println("Error setting up container", err)
411			os.Exit(1)
412		}
413
414		defer func() { _ = closer.Close() }()
415		testhelpers.WithDB(t, DBName, testhelpers.NoSuperuser, true, extensionState, func(_ *pgxpool.Pool, t testing.TB, connectURL string) {
416			migrateToVersion(t, connectURL, version.Promscale, "azxtestcommit")
417
418			// need to get a new pool after the Migrate to catch any GUC changes made during Migrate
419			db, err := pgxpool.Connect(context.Background(), connectURL)
420			if err != nil {
421				t.Fatal(err)
422			}
423			defer db.Close()
424			preRestart(container, connectURL, db, tmpDir)
425		})
426	}()
427	container, closer, err := testhelpers.StartDatabaseImage(ctx, cleanImage, tmpDir, dataDir, *printLogs, extensionState)
428	if err != nil {
429		fmt.Println("Error setting up container", err)
430		os.Exit(1)
431	}
432
433	if extensionState.UsesMultinode() {
434		addNode2(t, *testDatabase)
435	}
436	defer func() { _ = closer.Close() }()
437	connectURL := testhelpers.PgConnectURL(*testDatabase, testhelpers.NoSuperuser)
438	db, err := pgxpool.Connect(context.Background(), connectURL)
439	if err != nil {
440		t.Fatal(err)
441	}
442	defer db.Close()
443	postRestart(container, connectURL, db, tmpDir)
444}
445
446func migrateToVersion(t testing.TB, connectURL string, version string, commitHash string) {
447	err := extension.InstallUpgradeTimescaleDBExtensions(connectURL, extension.ExtensionMigrateOptions{Install: true, Upgrade: true, UpgradePreRelease: true})
448	if err != nil {
449		t.Fatal(err)
450	}
451
452	migratePool, err := pgx.Connect(context.Background(), connectURL)
453	if err != nil {
454		t.Fatal(err)
455	}
456	defer func() { _ = migratePool.Close(context.Background()) }()
457	err = runner.SetupDBState(migratePool, pgmodel.VersionInfo{Version: version, CommitHash: commitHash}, nil, extension.ExtensionMigrateOptions{Install: true, Upgrade: true, UpgradePreRelease: true})
458	if err != nil {
459		t.Fatal(err)
460	}
461}
462
463func tsWriteReq(ts []prompb.TimeSeries) prompb.WriteRequest {
464	return prompb.WriteRequest{
465		Timeseries: ts,
466	}
467}
468
469func writeReqToHttp(r prompb.WriteRequest) *bytes.Reader {
470	data, _ := proto.Marshal(&r)
471	body := snappy.Encode(nil, data)
472	return bytes.NewReader(body)
473}
474
475func doWrite(t *testing.T, client *http.Client, url string, data ...[]prompb.TimeSeries) {
476	for _, data := range data {
477		body := writeReqToHttp(tsWriteReq(copyMetrics(data)))
478		req, err := http.NewRequest("POST", url, body)
479		if err != nil {
480			t.Errorf("Error creating request: %v", err)
481		}
482		req.Header.Add("Content-Encoding", "snappy")
483		req.Header.Set("Content-Type", "application/x-protobuf")
484		req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
485		resp, err := client.Do(req)
486		if err != nil {
487			t.Fatal(err)
488		}
489		if resp.StatusCode != 200 {
490			t.Fatal("non-ok status:", resp.Status)
491		}
492
493		_, _ = io.Copy(ioutil.Discard, resp.Body)
494		resp.Body.Close()
495	}
496}
497
498func doIngest(t *testing.T, ingstr *ingestor.DBIngestor, data ...[]prompb.TimeSeries) {
499	for _, data := range data {
500		wr := ingestor.NewWriteRequest()
501		wr.Timeseries = copyMetrics(data)
502		_, _, err := ingstr.Ingest(wr)
503		if err != nil {
504			t.Fatalf("ingest error: %v", err)
505		}
506		_ = ingstr.CompleteMetricCreation()
507	}
508}
509
510// deep copy the metrics since we mutate them, and don't want to invalidate the tests
511func copyMetrics(metrics []prompb.TimeSeries) []prompb.TimeSeries {
512	out := make([]prompb.TimeSeries, len(metrics))
513	copy(out, metrics)
514	for i := range out {
515		out[i].Labels = make([]prompb.Label, len(metrics[i].Labels))
516		out[i].Samples = make([]prompb.Sample, len(metrics[i].Samples))
517		copy(out[i].Labels, metrics[i].Labels)
518		copy(out[i].Samples, metrics[i].Samples)
519	}
520	return out
521}
522
523func TestExtensionUpgrade(t *testing.T) {
524	var err error
525	var version string
526
527	if true {
528		t.Skip("Temporarily disabled test")
529	}
530
531	ctx := context.Background()
532
533	buildPromscaleImageFromRepo(t)
534	_, dbContainer, closer := startDB(t, ctx)
535	defer closer.Close()
536
537	// as the default installed version ext is rc4 in the test image downgrade it to rc2
538	// to test upgrade flow.
539	extVersion := "2.0.0-rc2"
540	dropAndCreateExt(t, ctx, extVersion)
541
542	db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
543	if err != nil {
544		t.Fatal(err)
545	}
546
547	err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version)
548	if err != nil {
549		t.Fatal(err)
550	}
551
552	if version != extVersion {
553		t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false")
554	}
555
556	// start promscale & test upgrade-prerelease-extensions as false
557	// Now the ext is rc2 it should be rc2 after promscale startup too
558	func() {
559		connectorImage := "timescale/promscale:latest"
560		databaseName := "postgres"
561		connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName)
562		if err != nil {
563			t.Fatal(err)
564		}
565		defer testhelpers.StopContainer(ctx, connector, *printLogs)
566		err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version)
567		if err != nil {
568			t.Fatal(err)
569		}
570
571		if version != extVersion {
572			t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false")
573		}
574		t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions false")
575	}()
576
577	db.Close(ctx)
578
579	// start a new connector and test --upgrade-prerelease-extensions as true
580	// the default installed ext version is rc2 now it should upgrade it to rc4
581	func() {
582		connectorImage := "timescale/promscale:latest"
583		databaseName := "postgres"
584		flags := []string{"-upgrade-prerelease-extensions", "true"}
585		connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, flags, databaseName)
586		if err != nil {
587			t.Fatal(err)
588		}
589		defer testhelpers.StopContainer(ctx, connector, *printLogs)
590
591		var versionStr string
592		db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
593		if err != nil {
594			t.Fatal(err)
595		}
596		err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&versionStr)
597		if err != nil {
598			t.Fatal(err)
599		}
600
601		db.Close(ctx)
602
603		if versionStr != "2.0.0-rc4" {
604			t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension true")
605		}
606		t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions true")
607	}()
608}
609
610func TestMigrationFailure(t *testing.T) {
611	var err error
612	var version string
613
614	if true {
615		t.Skip("Temporarily disabled test")
616	}
617	ctx := context.Background()
618
619	buildPromscaleImageFromRepo(t)
620	db, dbContainer, closer := startDB(t, ctx)
621	defer closer.Close()
622
623	// start promscale & test upgrade-extensions as false
624	func() {
625		connectorImage := "timescale/promscale:latest"
626		databaseName := "postgres"
627		connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName)
628		if err != nil {
629			t.Fatal(err)
630		}
631		defer testhelpers.StopContainer(ctx, connector, *printLogs)
632
633		err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version)
634		if err != nil {
635			t.Fatal(err)
636		}
637
638		db.Close(ctx)
639		if version != "2.0.0-rc4" {
640			t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false")
641		}
642		t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions false.")
643	}()
644
645	// As the timescaleDB installed version is rc4, lets install the 1.7.3 ext version
646	extVersion := "1.7.3"
647	dropAndCreateExt(t, ctx, extVersion)
648
649	db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
650	if err != nil {
651		t.Fatal(err)
652	}
653
654	err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version)
655	if err != nil {
656		t.Fatal(err)
657	}
658
659	db.Close(ctx)
660	if version != extVersion {
661		t.Fatal("failed to verify upgrade extension with -upgrade-prerelease-extension false")
662	}
663
664	// start a new connector and test --upgrade-extensions as true which is by default set in flags
665	// the migration should fail (upgrade path in tsdb isn't available) but promscale should be running.
666	func() {
667		connectorImage := "timescale/promscale:latest"
668		databaseName := "postgres"
669		connector, err := testhelpers.StartConnectorWithImage(ctx, dbContainer, connectorImage, *printLogs, []string{}, databaseName)
670		if err != nil {
671			t.Fatal(err)
672		}
673		defer testhelpers.StopContainer(ctx, connector, *printLogs)
674
675		var version string
676		db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
677		if err != nil {
678			t.Fatal(err)
679		}
680		err = db.QueryRow(ctx, `SELECT extversion FROM pg_extension where extname='timescaledb'`).Scan(&version)
681		if err != nil {
682			t.Fatal(err)
683		}
684		db.Close(ctx)
685
686		if version != "1.7.3" {
687			t.Fatal("failed to verify timescaleDB extension version")
688		}
689
690		// Now from the check we are know that migration failed from 1.7.3 to 1.7.4
691		// as the upgrade script doesn't exist within timescaleDB image.
692		// Now check promscale is still running on migration failure.
693		exitCode, err := connector.Exec(context.Background(), []string{"echo", "hello"})
694		if exitCode != 0 || err != nil {
695			t.Fatal("promscale failed to run extension migration failure", err)
696		}
697		t.Logf("successfully tested extension upgrade flow with --upgrade-prereleases-extensions true where migration fails and promscale keeps running.")
698	}()
699}
700
701func buildPromscaleImageFromRepo(t *testing.T) {
702	t.Logf("building promscale image from the codebase")
703	cmd := exec.Command("docker", "build", "-t", "timescale/promscale:latest", "./../../../", "--file", "./../../../build/Dockerfile")
704	err := cmd.Run()
705	if err != nil {
706		t.Fatal(err)
707	}
708	t.Logf("successfully built promscale:latest image from codebase.")
709}
710
711func startDB(t *testing.T, ctx context.Context) (*pgx.Conn, testcontainers.Container, io.Closer) {
712	tmpDir, err := testhelpers.TempDir("update_test_out")
713	if err != nil {
714		t.Fatal(err)
715	}
716
717	dataDir, err := testhelpers.TempDir("update_test_data")
718	if err != nil {
719		t.Fatal(err)
720	}
721
722	dbContainer, closer, err := testhelpers.StartDatabaseImage(ctx, "timescaledev/promscale-extension:testing-extension-upgrade", tmpDir, dataDir, *printLogs, testhelpers.Timescale1AndPromscale)
723	if err != nil {
724		t.Fatal("Error setting up container", err)
725	}
726
727	// need to get a new pool after the Migrate to catch any GUC changes made during Migrate
728	db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
729	if err != nil {
730		t.Fatal(err)
731	}
732
733	return db, dbContainer, closer
734}
735
736func dropAndCreateExt(t *testing.T, ctx context.Context, extVersion string) {
737	// Drop existing installed extension & install a lower extension version to test upgrade
738	db, err := pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
739	if err != nil {
740		t.Fatal(err)
741	}
742
743	_, err = db.Exec(ctx, `DROP EXTENSION timescaledb`)
744	if err != nil {
745		t.Fatal(err)
746	}
747	db.Close(ctx)
748
749	db, err = pgx.Connect(ctx, testhelpers.PgConnectURL("postgres", testhelpers.Superuser))
750	if err != nil {
751		t.Fatal(err)
752	}
753
754	_, err = db.Exec(ctx, fmt.Sprintf(`CREATE EXTENSION timescaledb version '%s'`, extVersion))
755	if err != nil {
756		t.Fatal(err)
757	}
758	db.Close(ctx)
759}
760