1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package util 6 7import ( 8 "context" 9 "flag" 10 "fmt" 11 "os" 12 "testing" 13 "time" 14 15 "github.com/jackc/pgx/v4/pgxpool" 16 _ "github.com/jackc/pgx/v4/stdlib" 17 "github.com/timescale/promscale/pkg/internal/testhelpers" 18 "github.com/timescale/promscale/pkg/log" 19) 20 21var ( 22 useDocker = flag.Bool("use-docker", true, "start database using a docker container") 23 testDatabase = flag.String("database", "tmp_db_timescale_migrate_test", "database to run integration tests on") 24 electionInterval = flag.Duration("election-interval", 1*time.Second, "Scheduled election interval") 25) 26 27const extensionState = testhelpers.Timescale2AndPromscale 28 29func TestPgLeaderLock(t *testing.T) { 30 testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) { 31 lock, err := NewPgLeaderLock(1, connectURL, nil) 32 if err != nil { 33 t.Fatal(err) 34 } 35 defer lock.Close() 36 if !lock.locked() { 37 t.Error("Couldn't obtain the lock") 38 } 39 40 newLock, err := NewPgLeaderLock(1, connectURL, nil) 41 if err != nil { 42 t.Fatal(err) 43 } 44 defer newLock.Close() 45 if newLock.locked() { 46 t.Error("Lock should have already been taken") 47 } 48 49 if err = lock.release(); err != nil { 50 t.Errorf("Failed to release a lock. Error: %v", err) 51 } 52 53 if lock.locked() { 54 t.Error("Should be unlocked after release") 55 } 56 57 _, err = newLock.tryLock() 58 if err != nil { 59 t.Fatal(err) 60 } 61 62 if !newLock.locked() { 63 t.Error("New lock should take over") 64 } 65 }) 66} 67 68func TestElector(t *testing.T) { 69 testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) { 70 lock1, err := NewPgLeaderLock(2, connectURL, nil) 71 if err != nil { 72 t.Error(err) 73 } 74 defer lock1.Close() 75 elector1 := NewElector(lock1) 76 leader, _ := elector1.BecomeLeader() 77 if !leader { 78 t.Error("Failed to become a leader") 79 } 80 81 lock2, err := NewPgLeaderLock(2, connectURL, nil) 82 if err != nil { 83 t.Error(err) 84 } 85 defer lock2.Close() 86 elector2 := NewElector(lock2) 87 leader, _ = elector2.BecomeLeader() 88 if leader { 89 t.Error("Shouldn't be possible") 90 } 91 92 err = elector1.Resign() 93 if err != nil { 94 t.Fatal(err) 95 } 96 leader, _ = elector2.BecomeLeader() 97 if !leader { 98 t.Error("Should become a leader") 99 } 100 }) 101} 102 103func TestPrometheusLivenessCheck(t *testing.T) { 104 testhelpers.WithDB(t, *testDatabase, testhelpers.NoSuperuser, false, extensionState, func(pool *pgxpool.Pool, t testing.TB, connectURL string) { 105 lock1, err := NewPgLeaderLock(3, connectURL, nil) 106 if err != nil { 107 t.Error(err) 108 } 109 defer lock1.Close() 110 lock2, err := NewPgLeaderLock(3, connectURL, nil) 111 if err != nil { 112 t.Error(err) 113 } 114 defer lock2.Close() 115 elector1 := NewScheduledElector(lock1, *electionInterval) 116 elector2 := NewScheduledElector(lock2, *electionInterval) 117 leader1 := elector1.elect() 118 if !leader1 { 119 t.Error("Failed to become a leader") 120 } 121 leader2 := elector2.elect() 122 if leader2 { 123 t.Error("Two leaders") 124 } 125 elector1.PrometheusLivenessCheck(0, 0) 126 leader2 = elector2.elect() 127 if !leader2 { 128 t.Error("Failed to become a leader after live fail") 129 } 130 leader1, _ = lock1.IsLeader() 131 if leader1 { 132 t.Error("Shouldn't be a leader") 133 } 134 if !elector1.isScheduledElectionPaused() { 135 t.Error("Scheduled election should be paused") 136 } 137 elector1.PrometheusLivenessCheck(time.Now().UnixNano(), time.Hour) 138 if elector1.isScheduledElectionPaused() { 139 t.Error("Scheduled election shouldn't be paused anymore") 140 } 141 }) 142} 143func TestMain(m *testing.M) { 144 flag.Parse() 145 err := log.Init(log.Config{ 146 Level: "debug", 147 }) 148 if err != nil { 149 panic(err) 150 } 151 ctx := context.Background() 152 if !testing.Short() && *useDocker { 153 154 _, closer, err := testhelpers.StartPGContainer(ctx, extensionState, "", false) 155 if err != nil { 156 fmt.Println("Error setting up container", err) 157 os.Exit(1) 158 } 159 defer closer.Close() 160 } 161 code := m.Run() 162 os.Exit(code) 163} 164