1package end_to_end_tests 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8 9 "github.com/jackc/pgx/v4/pgxpool" 10 "github.com/prometheus/common/model" 11 "github.com/prometheus/prometheus/pkg/labels" 12 "github.com/stretchr/testify/require" 13 "github.com/timescale/promscale/pkg/clockcache" 14 "github.com/timescale/promscale/pkg/internal/testhelpers" 15 "github.com/timescale/promscale/pkg/log" 16 "github.com/timescale/promscale/pkg/pgmodel/cache" 17 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 18 "github.com/timescale/promscale/pkg/pgmodel/lreader" 19 pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" 20 "github.com/timescale/promscale/pkg/pgmodel/querier" 21 "github.com/timescale/promscale/pkg/pgxconn" 22 "github.com/timescale/promscale/pkg/prompb" 23 "github.com/timescale/promscale/pkg/promql" 24 "github.com/timescale/promscale/pkg/query" 25) 26 27func TestContinuousAggDownsampling(t *testing.T) { 28 if testing.Short() { 29 t.Skip("skipping integration test") 30 } 31 if !*useTimescaleDB { 32 t.Skip("continuous aggregates need TimescaleDB support") 33 } 34 if *useTimescaleOSS { 35 t.Skip("continuous aggregates need non-OSS version of TimescaleDB") 36 } 37 if *useMultinode { 38 t.Skip("continuous aggregates not supported in multinode TimescaleDB setup") 39 } 40 41 testCases := []struct { 42 name string 43 query string 44 startMs int64 45 endMs int64 46 stepMs int64 47 res promql.Result 48 }{ 49 { 50 name: "Query non-existant column, empty result", 51 query: `cagg{__column__="nonexistant"}`, 52 startMs: startTime, 53 endMs: endTime, 54 stepMs: 360 * 1000, 55 res: promql.Result{ 56 Value: promql.Matrix{}, 57 }, 58 }, 59 { 60 name: "Query default column", 61 query: `cagg{instance="1"}`, 62 startMs: startTime, 63 endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value 64 stepMs: 3600 * 1000, 65 res: promql.Result{ 66 Value: promql.Matrix{ 67 promql.Series{ 68 Metric: labels.Labels{ 69 labels.Label{Name: "__name__", Value: "cagg"}, 70 labels.Label{Name: "__schema__", Value: "cagg_schema"}, 71 labels.Label{Name: "foo", Value: "bat"}, 72 labels.Label{Name: "instance", Value: "1"}, 73 }, 74 Points: []promql.Point{ 75 promql.Point{T: 1577836800000, V: 952}, 76 promql.Point{T: 1577840400000, V: 1912}, 77 promql.Point{T: 1577844000000, V: 2872}, 78 promql.Point{T: 1577847600000, V: 3832}, 79 }, 80 }, 81 }, 82 }, 83 }, 84 { 85 name: "Query max column", 86 query: `cagg{__column__="max",instance="1"}`, 87 startMs: startTime, 88 endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value 89 stepMs: 3600 * 1000, 90 res: promql.Result{ 91 Value: promql.Matrix{ 92 promql.Series{ 93 Metric: labels.Labels{ 94 labels.Label{Name: "__column__", Value: "max"}, 95 labels.Label{Name: "__name__", Value: "cagg"}, 96 labels.Label{Name: "__schema__", Value: "cagg_schema"}, 97 labels.Label{Name: "foo", Value: "bat"}, 98 labels.Label{Name: "instance", Value: "1"}, 99 }, 100 Points: []promql.Point{ 101 promql.Point{T: 1577836800000, V: 952}, 102 promql.Point{T: 1577840400000, V: 1912}, 103 promql.Point{T: 1577844000000, V: 2872}, 104 promql.Point{T: 1577847600000, V: 3832}, 105 }, 106 }, 107 }, 108 }, 109 }, 110 { 111 name: "Query min column", 112 query: `cagg{__column__="min",instance="1"}`, 113 startMs: startTime, 114 endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value 115 stepMs: 3600 * 1000, 116 res: promql.Result{ 117 Value: promql.Matrix{ 118 promql.Series{ 119 Metric: labels.Labels{ 120 labels.Label{Name: "__column__", Value: "min"}, 121 labels.Label{Name: "__name__", Value: "cagg"}, 122 labels.Label{Name: "__schema__", Value: "cagg_schema"}, 123 labels.Label{Name: "foo", Value: "bat"}, 124 labels.Label{Name: "instance", Value: "1"}, 125 }, 126 Points: []promql.Point{ 127 promql.Point{T: 1577836800000, V: 0}, 128 promql.Point{T: 1577840400000, V: 960}, 129 promql.Point{T: 1577844000000, V: 1920}, 130 promql.Point{T: 1577847600000, V: 2880}, 131 }, 132 }, 133 }, 134 }, 135 }, 136 { 137 name: "Query avg column", 138 query: `cagg{__column__="avg",instance="1"}`, 139 startMs: startTime, 140 endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value 141 stepMs: 3600 * 1000, 142 res: promql.Result{ 143 Value: promql.Matrix{ 144 promql.Series{ 145 Metric: labels.Labels{ 146 labels.Label{Name: "__column__", Value: "avg"}, 147 labels.Label{Name: "__name__", Value: "cagg"}, 148 labels.Label{Name: "__schema__", Value: "cagg_schema"}, 149 labels.Label{Name: "foo", Value: "bat"}, 150 labels.Label{Name: "instance", Value: "1"}, 151 }, 152 Points: []promql.Point{ 153 promql.Point{T: 1577836800000, V: 476}, 154 promql.Point{T: 1577840400000, V: 1436}, 155 promql.Point{T: 1577844000000, V: 2396}, 156 promql.Point{T: 1577847600000, V: 3356}, 157 }, 158 }, 159 }, 160 }, 161 }, 162 } 163 164 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 165 ts := []prompb.TimeSeries{ 166 { 167 // This series will be deleted along with it's label once the samples 168 // have been deleted from raw metric and cagg. 169 Labels: []prompb.Label{ 170 {Name: pgmodel.MetricNameLabelName, Value: "metric_2"}, 171 {Name: "name1", Value: "value1"}, 172 }, 173 Samples: []prompb.Sample{ 174 //this will be dropped (notice the - 1000) 175 {Timestamp: startTime - 1000, Value: 0.1}, 176 }, 177 }, 178 } 179 180 // Ingest test dataset. 181 ingestQueryTestDataset(db, t, append(generateLargeTimeseries(), ts...)) 182 183 if _, err := db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()"); err != nil { 184 t.Fatalf("unexpected error while ingesting test dataset: %s", err) 185 } 186 if _, err := db.Exec(context.Background(), "CREATE SCHEMA cagg_schema"); err != nil { 187 t.Fatalf("unexpected error while creating view schema: %s", err) 188 } 189 if *useTimescale2 { 190 if _, err := db.Exec(context.Background(), 191 `CREATE MATERIALIZED VIEW cagg_schema.cagg( time, series_id, value, max, min, avg) 192WITH (timescaledb.continuous) AS 193 SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg 194 FROM prom_data.metric_2 195 GROUP BY time_bucket('1hour', time), series_id`); err != nil { 196 t.Fatalf("unexpected error while creating metric view: %s", err) 197 } 198 } else { 199 // Using TimescaleDB 1.x 200 if _, err := db.Exec(context.Background(), 201 `CREATE VIEW cagg_schema.cagg( time, series_id, value, max, min, avg) 202WITH (timescaledb.continuous, timescaledb.ignore_invalidation_older_than = '1 min', timescaledb.refresh_lag = '-2h') AS 203 SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg 204 FROM prom_data.metric_2 205 GROUP BY time_bucket('1hour', time), series_id`); err != nil { 206 t.Fatalf("unexpected error while creating metric view: %s", err) 207 } 208 if _, err := db.Exec(context.Background(), 209 `REFRESH MATERIALIZED VIEW cagg_schema.cagg`); err != nil { 210 t.Fatalf("unexpected error while creating metric view: %s", err) 211 } 212 } 213 if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')"); err != nil { 214 t.Fatalf("unexpected error while registering metric view: %s", err) 215 } 216 217 // Getting a read-only connection to ensure read path is idempotent. 218 readOnly := testhelpers.GetReadOnlyConnection(t, *testDatabase) 219 defer readOnly.Close() 220 221 var tester *testing.T 222 var ok bool 223 if tester, ok = t.(*testing.T); !ok { 224 t.Fatalf("Cannot run test, not an instance of testing.T") 225 return 226 } 227 228 mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)} 229 lCache := clockcache.WithMax(100) 230 dbConn := pgxconn.NewPgxConn(readOnly) 231 labelsReader := lreader.NewLabelsReader(dbConn, lCache) 232 r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) 233 queryable := query.NewQueryable(r, labelsReader) 234 queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, []string{}) 235 if err != nil { 236 t.Fatal(err) 237 } 238 239 for _, c := range testCases { 240 tc := c 241 tester.Run(c.name, func(t *testing.T) { 242 var qry promql.Query 243 var err error 244 245 if tc.stepMs == 0 { 246 qry, err = queryEngine.NewInstantQuery(queryable, c.query, model.Time(tc.endMs).Time()) 247 } else { 248 qry, err = queryEngine.NewRangeQuery(queryable, tc.query, model.Time(tc.startMs).Time(), model.Time(tc.endMs).Time(), time.Duration(tc.stepMs)*time.Millisecond) 249 } 250 if err != nil { 251 t.Fatal(err) 252 } 253 254 res := qry.Exec(context.Background()) 255 require.Equal(t, tc.res, *res) 256 }) 257 } 258 259 count := 0 260 err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom_data.metric_2`).Scan(&count) 261 if err != nil { 262 t.Error("error fetching count of raw metric timeseries", err) 263 } 264 265 seriesCount := 0 266 err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&seriesCount) 267 if err != nil { 268 t.Error("error fetching series count:", err) 269 } 270 271 // Drop some raw metric data and check that the series data is not marked for deletion. 272 // NOTE: we cannot drop all the raw data becuase we are getting `too far behind` issues with 1.x caggs 273 _, err = db.Exec(context.Background(), "CALL _prom_catalog.drop_metric_chunks($1, $2, $3)", schema.Data, "metric_2", time.Unix(endTime/1000-20000, 0)) 274 if err != nil { 275 t.Fatalf("unexpected error while dropping metric chunks: %s", err) 276 } 277 278 afterCount := 0 279 err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom_data.metric_2`).Scan(&afterCount) 280 if err != nil { 281 t.Error("error fetching count of raw metric timeseries", err) 282 } 283 if afterCount >= count { 284 t.Errorf("unexpected row count: got %v, wanted less then %v", afterCount, count) 285 } 286 287 err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&afterCount) 288 if err != nil { 289 t.Error("error fetching series count after drop:", err) 290 } 291 292 // None of the series should be removed. 293 if afterCount != seriesCount { 294 t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) 295 } 296 297 err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) 298 if err != nil { 299 t.Error("error fetching series marked for deletion count", err) 300 } 301 302 // None of the series should be marked for deletion. 303 if count != 0 { 304 t.Errorf("unexpected series count: got %v, wanted 0", count) 305 } 306 307 // Count cagg rows before dropping. 308 err = db.QueryRow(context.Background(), `SELECT count(*) FROM cagg_schema.cagg`).Scan(&count) 309 if err != nil { 310 t.Error("error fetching count of raw metric timeseries", err) 311 } 312 313 // Drop all of the cagg data. 314 _, err = db.Exec(context.Background(), "CALL _prom_catalog.drop_metric_chunks($1, $2, $3)", "cagg_schema", "cagg", time.Now()) 315 if err != nil { 316 t.Fatalf("unexpected error while dropping cagg chunks: %s", err) 317 } 318 319 err = db.QueryRow(context.Background(), `SELECT count(*) FROM cagg_schema.cagg`).Scan(&afterCount) 320 if err != nil { 321 t.Error("error fetching count of cagg metric timeseries", err) 322 } 323 if afterCount >= count { 324 t.Errorf("unexpected row count: got %v, wanted less then %v", afterCount, count) 325 } 326 327 err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&afterCount) 328 if err != nil { 329 t.Error("error fetching series count after drop:", err) 330 } 331 332 // None of the series should be removed. 333 if afterCount != seriesCount { 334 t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) 335 } 336 337 err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) 338 if err != nil { 339 t.Error("error fetching series marked for deletion count", err) 340 } 341 342 // Now that the raw metric and cagg metric have been deleted, one series that was completely 343 // deleted will be marked for deletion. 344 if count != 1 { 345 t.Errorf("unexpected series count: got %v, wanted 1", count) 346 } 347 }) 348} 349 350func TestContinuousAggDataRetention(t *testing.T) { 351 if testing.Short() { 352 t.Skip("skipping integration test") 353 } 354 if !*useTimescaleDB { 355 t.Skip("continuous aggregates need TimescaleDB support") 356 } 357 if *useTimescaleOSS { 358 t.Skip("continuous aggregates need non-OSS version of TimescaleDB") 359 } 360 if *useMultinode { 361 t.Skip("continuous aggregates not supported in multinode TimescaleDB setup") 362 } 363 364 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 365 dbJob := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_maintenance") 366 defer dbJob.Close() 367 dbSuper, err := pgxpool.Connect(context.Background(), testhelpers.PgConnectURL(*testDatabase, testhelpers.Superuser)) 368 require.NoError(t, err) 369 defer dbSuper.Close() 370 //a chunk way back in 2009 371 oldChunk := time.Date(2009, time.November, 11, 0, 0, 0, 0, time.UTC) 372 chunkInRetentionPolicy := time.Now().Add(-100 * 24 * time.Hour) 373 374 ts := []prompb.TimeSeries{ 375 { 376 Labels: []prompb.Label{ 377 {Name: pgmodel.MetricNameLabelName, Value: "test"}, 378 {Name: "name1", Value: "value1"}, 379 }, 380 Samples: []prompb.Sample{ 381 {Timestamp: int64(model.TimeFromUnixNano(oldChunk.UnixNano()) - 1), Value: 0.1}, 382 {Timestamp: int64(model.TimeFromUnixNano(chunkInRetentionPolicy.UnixNano()) - 1), Value: 0.1}, 383 {Timestamp: int64(model.TimeFromUnixNano(time.Now().UnixNano()) - 1), Value: 0.1}, 384 }, 385 }, 386 } 387 ingestQueryTestDataset(db, t, ts) 388 389 _, err = db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()") 390 require.NoError(t, err) 391 _, err = db.Exec(context.Background(), "CREATE SCHEMA cagg_schema") 392 require.NoError(t, err) 393 394 if *useTimescale2 { 395 _, err = db.Exec(context.Background(), 396 `CREATE MATERIALIZED VIEW cagg_schema.cagg( time, series_id, value, max, min, avg) 397WITH (timescaledb.continuous) AS 398 SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg 399 FROM prom_data.test 400 GROUP BY time_bucket('1hour', time), series_id`) 401 require.NoError(t, err) 402 } else { 403 // Using TimescaleDB 1.x 404 _, err = db.Exec(context.Background(), 405 `CREATE VIEW cagg_schema.cagg( time, series_id, value, max, min, avg) 406WITH (timescaledb.continuous, timescaledb.max_interval_per_job = '1000 weeks', timescaledb.ignore_invalidation_older_than = '1 min') AS 407 SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg 408 FROM prom_data.test 409 GROUP BY time_bucket('1hour', time), series_id`) 410 require.NoError(t, err) 411 _, err = db.Exec(context.Background(), `REFRESH MATERIALIZED VIEW cagg_schema.cagg`) 412 require.NoError(t, err) 413 } 414 _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')") 415 require.NoError(t, err) 416 417 _, err = db.Exec(context.Background(), "SELECT prom_api.set_metric_retention_period('cagg_schema', 'cagg', INTERVAL '180 days')") 418 require.NoError(t, err) 419 420 caggHypertable := "" 421 err = db.QueryRow(context.Background(), "SELECT hypertable_relation FROM _prom_catalog.get_storage_hypertable_info('cagg_schema', 'cagg', true)").Scan(&caggHypertable) 422 require.NoError(t, err) 423 424 cnt := 0 425 err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt) 426 require.NoError(t, err) 427 require.Equal(t, 2, int(cnt), "Expected for cagg to have exactly 2 chunks") 428 429 _, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)") 430 require.NoError(t, err) 431 432 err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt) 433 require.NoError(t, err) 434 require.Equal(t, 1, int(cnt), "Expected for cagg to have exactly 1 chunk that is outside of 180 day retention period previously set for this metric") 435 436 _, err = db.Exec(context.Background(), "SELECT prom_api.reset_metric_retention_period('cagg_schema', 'cagg')") 437 require.NoError(t, err) 438 439 _, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)") 440 require.NoError(t, err) 441 442 err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt) 443 require.NoError(t, err) 444 require.Equal(t, 0, int(cnt), "Expected for cagg to have exactly 0 chunks since only data left in default retention period is too new to materialize") 445 }) 446} 447 448func TestContinuousAgg2StepAgg(t *testing.T) { 449 if testing.Short() { 450 t.Skip("skipping integration test") 451 } 452 if !*useTimescale2 { 453 t.Skip("2-step continuous aggregates need TimescaleDB 2.x support") 454 } 455 if !*useExtension { 456 t.Skip("2-step continuous aggregates need TimescaleDB 2.x HA image") 457 } 458 if *useTimescaleOSS { 459 t.Skip("continuous aggregates need non-OSS version of TimescaleDB") 460 } 461 if *useMultinode { 462 t.Skip("continuous aggregates not supported in multinode TimescaleDB setup") 463 } 464 465 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 466 dbJob := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_maintenance") 467 defer dbJob.Close() 468 dbSuper, err := pgxpool.Connect(context.Background(), testhelpers.PgConnectURL(*testDatabase, testhelpers.Superuser)) 469 require.NoError(t, err) 470 defer dbSuper.Close() 471 _, err = dbSuper.Exec(context.Background(), "CREATE EXTENSION timescaledb_toolkit") 472 require.NoError(t, err) 473 474 // Ingest test dataset. 475 ingestQueryTestDataset(db, t, generateLargeTimeseries()) 476 477 if _, err := db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()"); err != nil { 478 t.Fatalf("unexpected error while ingesting test dataset: %s", err) 479 } 480 if _, err := db.Exec(context.Background(), 481 `CREATE MATERIALIZED VIEW twa_cagg( time, series_id, tw) 482WITH (timescaledb.continuous) AS 483 SELECT time_bucket('1hour', time), series_id, time_weight('Linear', time, value) as tw 484 FROM prom_data.metric_2 485 GROUP BY time_bucket('1hour', time), series_id`); err != nil { 486 t.Fatalf("unexpected error while creating metric view: %s", err) 487 } 488 if _, err := db.Exec(context.Background(), 489 `CREATE VIEW tw_1hour( time, series_id, value) AS 490 SELECT time, series_id, average(tw) as value 491 FROM twa_cagg`); err != nil { 492 t.Fatalf("unexpected error while creating metric view: %s", err) 493 } 494 495 if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour')"); err != nil { 496 t.Fatalf("unexpected error while registering metric view: %s", err) 497 } 498 499 caggHypertable := "" 500 err = db.QueryRow(context.Background(), "SELECT hypertable_relation FROM _prom_catalog.get_storage_hypertable_info('public', 'tw_1hour', true)").Scan(&caggHypertable) 501 require.NoError(t, err) 502 503 cnt := 0 504 err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt) 505 require.NoError(t, err) 506 require.Greater(t, int(cnt), 0, "Expected for cagg to have at least one chunk") 507 508 _, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)") 509 require.NoError(t, err) 510 err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt) 511 require.NoError(t, err) 512 require.Equal(t, 0, int(cnt), "Expected for cagg to have no chunks, all outside of data retention period") 513 }) 514} 515