1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4package runner 5 6import ( 7 "context" 8 "fmt" 9 "regexp" 10 "strconv" 11 "sync/atomic" 12 "time" 13 14 "github.com/jackc/pgx/v4" 15 "github.com/timescale/promscale/pkg/api" 16 "github.com/timescale/promscale/pkg/log" 17 "github.com/timescale/promscale/pkg/pgclient" 18 "github.com/timescale/promscale/pkg/pgmodel" 19 "github.com/timescale/promscale/pkg/pgmodel/common/extension" 20 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 21 "github.com/timescale/promscale/pkg/tenancy" 22 "github.com/timescale/promscale/pkg/util" 23 "github.com/timescale/promscale/pkg/version" 24) 25 26var ( 27 appVersion = pgmodel.VersionInfo{Version: version.Promscale, CommitHash: version.CommitHash} 28 migrationLockError = fmt.Errorf("Could not acquire migration lock. Ensure there are no other connectors running and try again.") 29) 30 31func CreateClient(cfg *Config, promMetrics *api.Metrics) (*pgclient.Client, error) { 32 // The TimescaleDB migration has to happen before other connections 33 // are open, also it has to happen as the first command on a connection. 34 // Thus we cannot rely on the migration lock here. Instead we assume 35 // that upgrading TimescaleDB will not break existing connectors. 36 // (upgrading the DB will force-close all existing connections, so we may 37 // add a reconnect check that the DB has an appropriate version) 38 connStr := cfg.PgmodelCfg.GetConnectionStr() 39 extOptions := extension.ExtensionMigrateOptions{ 40 Install: cfg.InstallExtensions, 41 Upgrade: cfg.UpgradeExtensions, 42 UpgradePreRelease: cfg.UpgradePrereleaseExtensions, 43 } 44 45 if cfg.InstallExtensions { 46 err := extension.InstallUpgradeTimescaleDBExtensions(connStr, extOptions) 47 if err != nil { 48 return nil, err 49 } 50 } 51 52 // Migration lock logic: 53 // We don't want to upgrade the schema version while we still have connectors 54 // attached who think the schema is at the old version. To prevent this, as 55 // best we can, each normal connection attempts to grab a (shared) advisory 56 // lock on schemaLockId, and we attempt to grab an exclusive lock on it 57 // before running migrate. This implies that migration must be run when no 58 // other connector is running. 59 schemaVersionLease, err := util.NewPgAdvisoryLock(schema.LockID, connStr) 60 if err != nil { 61 log.Error("msg", "error creating schema version lease", "err", err) 62 return nil, startupError 63 } 64 // after the client has started it's in charge of maintaining the leases 65 defer schemaVersionLease.Close() 66 67 migrationFailedDueToLockError := true 68 if cfg.Migrate { 69 conn, err := schemaVersionLease.Conn() 70 if err != nil { 71 return nil, fmt.Errorf("migration error: %w", err) 72 } 73 lease := schemaVersionLease 74 if !cfg.UseVersionLease { 75 lease = nil 76 } 77 err = SetupDBState(conn, appVersion, lease, extOptions) 78 migrationFailedDueToLockError = err == migrationLockError 79 if err != nil && err != migrationLockError { 80 return nil, fmt.Errorf("migration error: %w", err) 81 } 82 83 if cfg.StopAfterMigrate { 84 if err != nil { 85 return nil, err 86 } 87 log.Info("msg", "Migration successful, exiting") 88 return nil, nil 89 } 90 } else { 91 log.Info("msg", "Skipping migration") 92 } 93 94 if cfg.UseVersionLease { 95 // Grab a lease to protect version checking and client creation. This 96 // lease will be released when the schemaVersionLease is closed. 97 locked, err := schemaVersionLease.GetSharedAdvisoryLock() 98 if err != nil { 99 return nil, fmt.Errorf("could not acquire schema version lease due to: %w", err) 100 } 101 if !locked { 102 return nil, fmt.Errorf("could not acquire schema version lease. is a migration in progress?") 103 } 104 } else { 105 log.Warn("msg", "skipping schema version lease") 106 } 107 108 // Check the database is on the correct version. 109 // This must be done even if we attempted a migrate; if we failed to acquire 110 // the lock we'll start up as-if we were never supposed to migrate in the 111 // first place. This is also needed as checkDependencies populates our 112 // extension metadata 113 conn, err := schemaVersionLease.Conn() 114 if err != nil { 115 return nil, fmt.Errorf("Dependency checking error while trying to open DB connection: %w", err) 116 } 117 err = pgmodel.CheckDependencies(conn, appVersion, migrationFailedDueToLockError, extOptions) 118 if err != nil { 119 err = fmt.Errorf("dependency error: %w", err) 120 if migrationFailedDueToLockError { 121 log.Error("msg", "Unable to run migrations; failed to acquire the lock. If the database is on an incorrect version, ensure there are no other connectors running and try again.", "err", err) 122 } 123 return nil, err 124 } 125 126 if cfg.InstallExtensions { 127 // Only check for background workers if TimessaleDB is installed. 128 if notOk, err := isBGWLessThanDBs(conn); err != nil { 129 return nil, fmt.Errorf("Error checking the number of background workers: %w", err) 130 } else if notOk { 131 log.Warn("msg", "Maximum background worker setting is too low for the number of databases in your system. "+ 132 "Please increase your timescaledb.max_background_workers setting. See https://docs.timescale.com/latest/getting-started/configuring#workers for more information.") 133 } 134 } 135 136 isLicenseOSS, err := isTimescaleDBOSS(conn) 137 if err != nil { 138 return nil, fmt.Errorf("fetching license information: %w", err) 139 } 140 if isLicenseOSS { 141 log.Warn("msg", "WARNING: Using the Apache2 version of TimescaleDB. This version does not include "+ 142 "compression and thus performance and disk usage will be significantly negatively effected.") 143 } 144 145 // Election must be done after migration and version-checking: if we're on 146 // the wrong version we should not participate in leader-election. 147 elector, err = initElector(cfg, promMetrics) 148 149 if err != nil { 150 return nil, fmt.Errorf("elector init error: %w", err) 151 } 152 153 if (elector == nil && !cfg.APICfg.HighAvailability) && !cfg.APICfg.ReadOnly { 154 log.Info( 155 "msg", 156 "Prometheus HA is not enabled", 157 ) 158 } 159 160 leasingFunction := getSchemaLease 161 if !cfg.UseVersionLease { 162 leasingFunction = nil 163 } 164 165 multiTenancy := tenancy.NewNoopAuthorizer() 166 if cfg.TenancyCfg.EnableMultiTenancy { 167 multiTenancyConfig := tenancy.NewAllowAllTenantsConfig(cfg.TenancyCfg.AllowNonMTWrites) 168 if !cfg.TenancyCfg.SkipTenantValidation { 169 multiTenancyConfig = tenancy.NewSelectiveTenancyConfig(cfg.TenancyCfg.ValidTenantsList, cfg.TenancyCfg.AllowNonMTWrites) 170 } 171 multiTenancy, err = tenancy.NewAuthorizer(multiTenancyConfig) 172 if err != nil { 173 return nil, fmt.Errorf("new tenancy: %w", err) 174 } 175 cfg.APICfg.MultiTenancy = multiTenancy 176 } 177 178 // client has to be initiated after migrate since migrate 179 // can change database GUC settings 180 client, err := pgclient.NewClient(&cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly) 181 if err != nil { 182 return nil, fmt.Errorf("client creation error: %w", err) 183 } 184 185 return client, nil 186} 187 188func isTimescaleDBOSS(conn *pgx.Conn) (bool, error) { 189 var ( 190 isTimescaleDB bool 191 isLicenseOSS bool 192 ) 193 err := conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".is_timescaledb_installed()").Scan(&isTimescaleDB) 194 if err != nil { 195 return false, fmt.Errorf("error fetching whether TimescaleDB is installed: %w", err) 196 } 197 if !isTimescaleDB { 198 // Return false so that we don't warn for OSS TimescaleDB. 199 return false, nil 200 } 201 err = conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".is_timescaledb_oss()").Scan(&isLicenseOSS) 202 if err != nil { 203 return false, fmt.Errorf("error fetching TimescaleDB license: %w", err) 204 } 205 return isLicenseOSS, nil 206} 207 208// isBGWLessThanDBs checks if the background workers count is less than the database count. It should be 209// called only if TimescaleDB is installed. 210func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { 211 var ( 212 dbs int 213 maxBGWStr string 214 ) 215 err := conn.QueryRow(context.Background(), "SHOW timescaledb.max_background_workers").Scan(&maxBGWStr) 216 if err != nil { 217 return false, fmt.Errorf("Unable to fetch timescaledb.max_background_workers: %w", err) 218 } 219 maxBGWs, err := strconv.Atoi(maxBGWStr) 220 if err != nil { 221 return false, fmt.Errorf("maxBGw string conversion: %w", err) 222 } 223 err = conn.QueryRow(context.Background(), "SELECT count(*) from pg_catalog.pg_database").Scan(&dbs) 224 if err != nil { 225 return false, fmt.Errorf("Unable to fetch count of all databases: %w", err) 226 } 227 if maxBGWs < dbs+2 { 228 return true, nil 229 } 230 return false, nil 231} 232 233func initElector(cfg *Config, metrics *api.Metrics) (*util.Elector, error) { 234 if cfg.HaGroupLockID == 0 { 235 return nil, nil 236 } 237 if cfg.PrometheusTimeout == -1 { 238 return nil, fmt.Errorf("Prometheus timeout configuration must be set when using PG advisory lock") 239 } 240 241 connStr := cfg.PgmodelCfg.GetConnectionStr() 242 lock, err := util.NewPgLeaderLock(cfg.HaGroupLockID, connStr, getSchemaLease) 243 if err != nil { 244 return nil, fmt.Errorf("Error creating advisory lock\nhaGroupLockId: %d\nerr: %s\n", cfg.HaGroupLockID, err) 245 } 246 log.Info("msg", "Initializing leader election based on PostgreSQL advisory lock") 247 scheduledElector := util.NewScheduledElector(lock, cfg.ElectionInterval) 248 if cfg.PrometheusTimeout != 0 { 249 go func() { 250 ticker := time.NewTicker(promLivenessCheck) 251 for range ticker.C { 252 lastReq := atomic.LoadInt64(&metrics.LastRequestUnixNano) 253 scheduledElector.PrometheusLivenessCheck(lastReq, cfg.PrometheusTimeout) 254 } 255 }() 256 } 257 return &scheduledElector.Elector, nil 258} 259 260func SetupDBState(conn *pgx.Conn, appVersion pgmodel.VersionInfo, leaseLock *util.PgAdvisoryLock, extOptions extension.ExtensionMigrateOptions) error { 261 // At startup migrators attempt to grab the schema-version lock. If this 262 // fails that means some other connector is running. All is not lost: some 263 // other connector may have migrated the DB to the correct version. We warn, 264 // then start the connector as normal. If we are on the wrong version, the 265 // normal version-check code will prevent us from running. 266 267 if leaseLock != nil { 268 locked, err := leaseLock.GetAdvisoryLock() 269 if err != nil { 270 return fmt.Errorf("error while acquiring migration lock %w", err) 271 } 272 if !locked { 273 return migrationLockError 274 } 275 defer func() { 276 _, err := leaseLock.Unlock() 277 if err != nil { 278 log.Error("msg", "error while releasing migration lock", "err", err) 279 } 280 }() 281 } else { 282 log.Warn("msg", "skipping migration lock") 283 } 284 285 err := pgmodel.Migrate(conn, appVersion, extOptions) 286 if err != nil { 287 return fmt.Errorf("Error while trying to migrate DB: %w", err) 288 } 289 290 installedPromscaleExtension, err := extension.InstallUpgradePromscaleExtensions(conn, extOptions) 291 if err != nil { 292 return err 293 } 294 295 pgmodel.UpdateTelemetry(conn, appVersion, installedPromscaleExtension) 296 return nil 297} 298 299func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { 300 r, err := regexp.Compile("^(?:" + s + ")$") 301 if err != nil { 302 return nil, err 303 } 304 return r, nil 305} 306 307// Except for migration, every connection that communicates with the DB must be 308// guarded by an instante of the schema-version lease to ensure that no other 309// connector can migrate the DB out from under it. We do not bother to release 310// said lease; in such and event the connector will be shutdown anyway, and 311// connection-death will close the connection. 312func getSchemaLease(ctx context.Context, conn *pgx.Conn) error { 313 err := util.GetSharedLease(ctx, conn, schema.LockID) 314 if err != nil { 315 return err 316 } 317 return pgmodel.CheckSchemaVersion(ctx, conn, appVersion, false) 318} 319