1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4package runner
5
6import (
7	"context"
8	"fmt"
9	"regexp"
10	"strconv"
11	"sync/atomic"
12	"time"
13
14	"github.com/jackc/pgx/v4"
15	"github.com/timescale/promscale/pkg/api"
16	"github.com/timescale/promscale/pkg/log"
17	"github.com/timescale/promscale/pkg/pgclient"
18	"github.com/timescale/promscale/pkg/pgmodel"
19	"github.com/timescale/promscale/pkg/pgmodel/common/extension"
20	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
21	"github.com/timescale/promscale/pkg/tenancy"
22	"github.com/timescale/promscale/pkg/util"
23	"github.com/timescale/promscale/pkg/version"
24)
25
26var (
27	appVersion         = pgmodel.VersionInfo{Version: version.Promscale, CommitHash: version.CommitHash}
28	migrationLockError = fmt.Errorf("Could not acquire migration lock. Ensure there are no other connectors running and try again.")
29)
30
31func CreateClient(cfg *Config, promMetrics *api.Metrics) (*pgclient.Client, error) {
32	// The TimescaleDB migration has to happen before other connections
33	// are open, also it has to happen as the first command on a connection.
34	// Thus we cannot rely on the migration lock here. Instead we assume
35	// that upgrading TimescaleDB will not break existing connectors.
36	// (upgrading the DB will force-close all existing connections, so we may
37	// add a reconnect check that the DB has an appropriate version)
38	connStr := cfg.PgmodelCfg.GetConnectionStr()
39	extOptions := extension.ExtensionMigrateOptions{
40		Install:           cfg.InstallExtensions,
41		Upgrade:           cfg.UpgradeExtensions,
42		UpgradePreRelease: cfg.UpgradePrereleaseExtensions,
43	}
44
45	if cfg.InstallExtensions {
46		err := extension.InstallUpgradeTimescaleDBExtensions(connStr, extOptions)
47		if err != nil {
48			return nil, err
49		}
50	}
51
52	// Migration lock logic:
53	// We don't want to upgrade the schema version while we still have connectors
54	// attached who think the schema is at the old version. To prevent this, as
55	// best we can, each normal connection attempts to grab a (shared) advisory
56	// lock on schemaLockId, and we attempt to grab an exclusive lock on it
57	// before running migrate. This implies that migration must be run when no
58	// other connector is running.
59	schemaVersionLease, err := util.NewPgAdvisoryLock(schema.LockID, connStr)
60	if err != nil {
61		log.Error("msg", "error creating schema version lease", "err", err)
62		return nil, startupError
63	}
64	// after the client has started it's in charge of maintaining the leases
65	defer schemaVersionLease.Close()
66
67	migrationFailedDueToLockError := true
68	if cfg.Migrate {
69		conn, err := schemaVersionLease.Conn()
70		if err != nil {
71			return nil, fmt.Errorf("migration error: %w", err)
72		}
73		lease := schemaVersionLease
74		if !cfg.UseVersionLease {
75			lease = nil
76		}
77		err = SetupDBState(conn, appVersion, lease, extOptions)
78		migrationFailedDueToLockError = err == migrationLockError
79		if err != nil && err != migrationLockError {
80			return nil, fmt.Errorf("migration error: %w", err)
81		}
82
83		if cfg.StopAfterMigrate {
84			if err != nil {
85				return nil, err
86			}
87			log.Info("msg", "Migration successful, exiting")
88			return nil, nil
89		}
90	} else {
91		log.Info("msg", "Skipping migration")
92	}
93
94	if cfg.UseVersionLease {
95		// Grab a lease to protect version checking and client creation. This
96		// lease will be released when the schemaVersionLease is closed.
97		locked, err := schemaVersionLease.GetSharedAdvisoryLock()
98		if err != nil {
99			return nil, fmt.Errorf("could not acquire schema version lease due to: %w", err)
100		}
101		if !locked {
102			return nil, fmt.Errorf("could not acquire schema version lease. is a migration in progress?")
103		}
104	} else {
105		log.Warn("msg", "skipping schema version lease")
106	}
107
108	// Check the database is on the correct version.
109	// This must be done even if we attempted a migrate; if we failed to acquire
110	// the lock we'll start up as-if we were never supposed to migrate in the
111	// first place. This is also needed as checkDependencies populates our
112	// extension metadata
113	conn, err := schemaVersionLease.Conn()
114	if err != nil {
115		return nil, fmt.Errorf("Dependency checking error while trying to open DB connection: %w", err)
116	}
117	err = pgmodel.CheckDependencies(conn, appVersion, migrationFailedDueToLockError, extOptions)
118	if err != nil {
119		err = fmt.Errorf("dependency error: %w", err)
120		if migrationFailedDueToLockError {
121			log.Error("msg", "Unable to run migrations; failed to acquire the lock. If the database is on an incorrect version, ensure there are no other connectors running and try again.", "err", err)
122		}
123		return nil, err
124	}
125
126	if cfg.InstallExtensions {
127		// Only check for background workers if TimessaleDB is installed.
128		if notOk, err := isBGWLessThanDBs(conn); err != nil {
129			return nil, fmt.Errorf("Error checking the number of background workers: %w", err)
130		} else if notOk {
131			log.Warn("msg", "Maximum background worker setting is too low for the number of databases in your system. "+
132				"Please increase your timescaledb.max_background_workers setting. See https://docs.timescale.com/latest/getting-started/configuring#workers for more information.")
133		}
134	}
135
136	isLicenseOSS, err := isTimescaleDBOSS(conn)
137	if err != nil {
138		return nil, fmt.Errorf("fetching license information: %w", err)
139	}
140	if isLicenseOSS {
141		log.Warn("msg", "WARNING: Using the Apache2 version of TimescaleDB. This version does not include "+
142			"compression and thus performance and disk usage will be significantly negatively effected.")
143	}
144
145	// Election must be done after migration and version-checking: if we're on
146	// the wrong version we should not participate in leader-election.
147	elector, err = initElector(cfg, promMetrics)
148
149	if err != nil {
150		return nil, fmt.Errorf("elector init error: %w", err)
151	}
152
153	if (elector == nil && !cfg.APICfg.HighAvailability) && !cfg.APICfg.ReadOnly {
154		log.Info(
155			"msg",
156			"Prometheus HA is not enabled",
157		)
158	}
159
160	leasingFunction := getSchemaLease
161	if !cfg.UseVersionLease {
162		leasingFunction = nil
163	}
164
165	multiTenancy := tenancy.NewNoopAuthorizer()
166	if cfg.TenancyCfg.EnableMultiTenancy {
167		multiTenancyConfig := tenancy.NewAllowAllTenantsConfig(cfg.TenancyCfg.AllowNonMTWrites)
168		if !cfg.TenancyCfg.SkipTenantValidation {
169			multiTenancyConfig = tenancy.NewSelectiveTenancyConfig(cfg.TenancyCfg.ValidTenantsList, cfg.TenancyCfg.AllowNonMTWrites)
170		}
171		multiTenancy, err = tenancy.NewAuthorizer(multiTenancyConfig)
172		if err != nil {
173			return nil, fmt.Errorf("new tenancy: %w", err)
174		}
175		cfg.APICfg.MultiTenancy = multiTenancy
176	}
177
178	// client has to be initiated after migrate since migrate
179	// can change database GUC settings
180	client, err := pgclient.NewClient(&cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly)
181	if err != nil {
182		return nil, fmt.Errorf("client creation error: %w", err)
183	}
184
185	return client, nil
186}
187
188func isTimescaleDBOSS(conn *pgx.Conn) (bool, error) {
189	var (
190		isTimescaleDB bool
191		isLicenseOSS  bool
192	)
193	err := conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".is_timescaledb_installed()").Scan(&isTimescaleDB)
194	if err != nil {
195		return false, fmt.Errorf("error fetching whether TimescaleDB is installed: %w", err)
196	}
197	if !isTimescaleDB {
198		// Return false so that we don't warn for OSS TimescaleDB.
199		return false, nil
200	}
201	err = conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".is_timescaledb_oss()").Scan(&isLicenseOSS)
202	if err != nil {
203		return false, fmt.Errorf("error fetching TimescaleDB license: %w", err)
204	}
205	return isLicenseOSS, nil
206}
207
208// isBGWLessThanDBs checks if the background workers count is less than the database count. It should be
209// called only if TimescaleDB is installed.
210func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) {
211	var (
212		dbs       int
213		maxBGWStr string
214	)
215	err := conn.QueryRow(context.Background(), "SHOW timescaledb.max_background_workers").Scan(&maxBGWStr)
216	if err != nil {
217		return false, fmt.Errorf("Unable to fetch timescaledb.max_background_workers: %w", err)
218	}
219	maxBGWs, err := strconv.Atoi(maxBGWStr)
220	if err != nil {
221		return false, fmt.Errorf("maxBGw string conversion: %w", err)
222	}
223	err = conn.QueryRow(context.Background(), "SELECT count(*) from pg_catalog.pg_database").Scan(&dbs)
224	if err != nil {
225		return false, fmt.Errorf("Unable to fetch count of all databases: %w", err)
226	}
227	if maxBGWs < dbs+2 {
228		return true, nil
229	}
230	return false, nil
231}
232
233func initElector(cfg *Config, metrics *api.Metrics) (*util.Elector, error) {
234	if cfg.HaGroupLockID == 0 {
235		return nil, nil
236	}
237	if cfg.PrometheusTimeout == -1 {
238		return nil, fmt.Errorf("Prometheus timeout configuration must be set when using PG advisory lock")
239	}
240
241	connStr := cfg.PgmodelCfg.GetConnectionStr()
242	lock, err := util.NewPgLeaderLock(cfg.HaGroupLockID, connStr, getSchemaLease)
243	if err != nil {
244		return nil, fmt.Errorf("Error creating advisory lock\nhaGroupLockId: %d\nerr: %s\n", cfg.HaGroupLockID, err)
245	}
246	log.Info("msg", "Initializing leader election based on PostgreSQL advisory lock")
247	scheduledElector := util.NewScheduledElector(lock, cfg.ElectionInterval)
248	if cfg.PrometheusTimeout != 0 {
249		go func() {
250			ticker := time.NewTicker(promLivenessCheck)
251			for range ticker.C {
252				lastReq := atomic.LoadInt64(&metrics.LastRequestUnixNano)
253				scheduledElector.PrometheusLivenessCheck(lastReq, cfg.PrometheusTimeout)
254			}
255		}()
256	}
257	return &scheduledElector.Elector, nil
258}
259
260func SetupDBState(conn *pgx.Conn, appVersion pgmodel.VersionInfo, leaseLock *util.PgAdvisoryLock, extOptions extension.ExtensionMigrateOptions) error {
261	// At startup migrators attempt to grab the schema-version lock. If this
262	// fails that means some other connector is running. All is not lost: some
263	// other connector may have migrated the DB to the correct version. We warn,
264	// then start the connector as normal. If we are on the wrong version, the
265	// normal version-check code will prevent us from running.
266
267	if leaseLock != nil {
268		locked, err := leaseLock.GetAdvisoryLock()
269		if err != nil {
270			return fmt.Errorf("error while acquiring migration lock %w", err)
271		}
272		if !locked {
273			return migrationLockError
274		}
275		defer func() {
276			_, err := leaseLock.Unlock()
277			if err != nil {
278				log.Error("msg", "error while releasing migration lock", "err", err)
279			}
280		}()
281	} else {
282		log.Warn("msg", "skipping migration lock")
283	}
284
285	err := pgmodel.Migrate(conn, appVersion, extOptions)
286	if err != nil {
287		return fmt.Errorf("Error while trying to migrate DB: %w", err)
288	}
289
290	installedPromscaleExtension, err := extension.InstallUpgradePromscaleExtensions(conn, extOptions)
291	if err != nil {
292		return err
293	}
294
295	pgmodel.UpdateTelemetry(conn, appVersion, installedPromscaleExtension)
296	return nil
297}
298
299func compileAnchoredRegexString(s string) (*regexp.Regexp, error) {
300	r, err := regexp.Compile("^(?:" + s + ")$")
301	if err != nil {
302		return nil, err
303	}
304	return r, nil
305}
306
307// Except for migration, every connection that communicates with the DB must be
308// guarded by an instante of the schema-version lease to ensure that no other
309// connector can migrate the DB out from under it. We do not bother to release
310// said lease; in such and event the connector will be shutdown anyway, and
311// connection-death will close the connection.
312func getSchemaLease(ctx context.Context, conn *pgx.Conn) error {
313	err := util.GetSharedLease(ctx, conn, schema.LockID)
314	if err != nil {
315		return err
316	}
317	return pgmodel.CheckSchemaVersion(ctx, conn, appVersion, false)
318}
319