1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package util
6
7import (
8	"context"
9	"flag"
10	"fmt"
11	"os"
12	"testing"
13	"time"
14
15	"github.com/jackc/pgx/v4/pgxpool"
16	_ "github.com/jackc/pgx/v4/stdlib"
17	"github.com/timescale/promscale/pkg/internal/testhelpers"
18	"github.com/timescale/promscale/pkg/log"
19)
20
21var (
22	useDocker        = flag.Bool("use-docker", true, "start database using a docker container")
23	testDatabase     = flag.String("database", "tmp_db_timescale_migrate_test", "database to run integration tests on")
24	electionInterval = flag.Duration("election-interval", 1*time.Second, "Scheduled election interval")
25)
26
27const extensionState = testhelpers.Timescale2AndPromscale
28
29func TestPgLeaderLock(t *testing.T) {
30	testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) {
31		lock, err := NewPgLeaderLock(1, connectURL, nil)
32		if err != nil {
33			t.Fatal(err)
34		}
35		defer lock.Close()
36		if !lock.locked() {
37			t.Error("Couldn't obtain the lock")
38		}
39
40		newLock, err := NewPgLeaderLock(1, connectURL, nil)
41		if err != nil {
42			t.Fatal(err)
43		}
44		defer newLock.Close()
45		if newLock.locked() {
46			t.Error("Lock should have already been taken")
47		}
48
49		if err = lock.release(); err != nil {
50			t.Errorf("Failed to release a lock. Error: %v", err)
51		}
52
53		if lock.locked() {
54			t.Error("Should be unlocked after release")
55		}
56
57		_, err = newLock.tryLock()
58		if err != nil {
59			t.Fatal(err)
60		}
61
62		if !newLock.locked() {
63			t.Error("New lock should take over")
64		}
65	})
66}
67
68func TestElector(t *testing.T) {
69	testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) {
70		lock1, err := NewPgLeaderLock(2, connectURL, nil)
71		if err != nil {
72			t.Error(err)
73		}
74		defer lock1.Close()
75		elector1 := NewElector(lock1)
76		leader, _ := elector1.BecomeLeader()
77		if !leader {
78			t.Error("Failed to become a leader")
79		}
80
81		lock2, err := NewPgLeaderLock(2, connectURL, nil)
82		if err != nil {
83			t.Error(err)
84		}
85		defer lock2.Close()
86		elector2 := NewElector(lock2)
87		leader, _ = elector2.BecomeLeader()
88		if leader {
89			t.Error("Shouldn't be possible")
90		}
91
92		err = elector1.Resign()
93		if err != nil {
94			t.Fatal(err)
95		}
96		leader, _ = elector2.BecomeLeader()
97		if !leader {
98			t.Error("Should become a leader")
99		}
100	})
101}
102
103func TestPrometheusLivenessCheck(t *testing.T) {
104	testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) {
105		lock1, err := NewPgLeaderLock(3, connectURL, nil)
106		if err != nil {
107			t.Error(err)
108		}
109		defer lock1.Close()
110		lock2, err := NewPgLeaderLock(3, connectURL, nil)
111		if err != nil {
112			t.Error(err)
113		}
114		defer lock2.Close()
115		elector1 := NewScheduledElector(lock1, *electionInterval)
116		elector2 := NewScheduledElector(lock2, *electionInterval)
117		leader1 := elector1.elect()
118		if !leader1 {
119			t.Error("Failed to become a leader")
120		}
121		leader2 := elector2.elect()
122		if leader2 {
123			t.Error("Two leaders")
124		}
125		elector1.PrometheusLivenessCheck(0, 0)
126		leader2 = elector2.elect()
127		if !leader2 {
128			t.Error("Failed to become a leader after live fail")
129		}
130		leader1, _ = lock1.IsLeader()
131		if leader1 {
132			t.Error("Shouldn't be a leader")
133		}
134		if !elector1.isScheduledElectionPaused() {
135			t.Error("Scheduled election should be paused")
136		}
137		elector1.PrometheusLivenessCheck(time.Now().UnixNano(), time.Hour)
138		if elector1.isScheduledElectionPaused() {
139			t.Error("Scheduled election shouldn't be paused anymore")
140		}
141	})
142}
143func TestMain(m *testing.M) {
144	flag.Parse()
145	err := log.Init(log.Config{
146		Level: "debug",
147	})
148	if err != nil {
149		panic(err)
150	}
151	ctx := context.Background()
152	if !testing.Short() && *useDocker {
153
154		_, closer, err := testhelpers.StartPGContainer(ctx, extensionState, "", false)
155		if err != nil {
156			fmt.Println("Error setting up container", err)
157			os.Exit(1)
158		}
159		defer closer.Close()
160	}
161	code := m.Run()
162	os.Exit(code)
163}
164