1package end_to_end_tests
2
3import (
4	"context"
5	"fmt"
6	"testing"
7	"time"
8
9	"github.com/jackc/pgx/v4/pgxpool"
10	"github.com/prometheus/common/model"
11	"github.com/prometheus/prometheus/pkg/labels"
12	"github.com/stretchr/testify/require"
13	"github.com/timescale/promscale/pkg/clockcache"
14	"github.com/timescale/promscale/pkg/internal/testhelpers"
15	"github.com/timescale/promscale/pkg/log"
16	"github.com/timescale/promscale/pkg/pgmodel/cache"
17	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
18	"github.com/timescale/promscale/pkg/pgmodel/lreader"
19	pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
20	"github.com/timescale/promscale/pkg/pgmodel/querier"
21	"github.com/timescale/promscale/pkg/pgxconn"
22	"github.com/timescale/promscale/pkg/prompb"
23	"github.com/timescale/promscale/pkg/promql"
24	"github.com/timescale/promscale/pkg/query"
25)
26
27func TestContinuousAggDownsampling(t *testing.T) {
28	if testing.Short() {
29		t.Skip("skipping integration test")
30	}
31	if !*useTimescaleDB {
32		t.Skip("continuous aggregates need TimescaleDB support")
33	}
34	if *useTimescaleOSS {
35		t.Skip("continuous aggregates need non-OSS version of TimescaleDB")
36	}
37	if *useMultinode {
38		t.Skip("continuous aggregates not supported in multinode TimescaleDB setup")
39	}
40
41	testCases := []struct {
42		name    string
43		query   string
44		startMs int64
45		endMs   int64
46		stepMs  int64
47		res     promql.Result
48	}{
49		{
50			name:    "Query non-existant column, empty result",
51			query:   `cagg{__column__="nonexistant"}`,
52			startMs: startTime,
53			endMs:   endTime,
54			stepMs:  360 * 1000,
55			res: promql.Result{
56				Value: promql.Matrix{},
57			},
58		},
59		{
60			name:    "Query default column",
61			query:   `cagg{instance="1"}`,
62			startMs: startTime,
63			endMs:   startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
64			stepMs:  3600 * 1000,
65			res: promql.Result{
66				Value: promql.Matrix{
67					promql.Series{
68						Metric: labels.Labels{
69							labels.Label{Name: "__name__", Value: "cagg"},
70							labels.Label{Name: "__schema__", Value: "cagg_schema"},
71							labels.Label{Name: "foo", Value: "bat"},
72							labels.Label{Name: "instance", Value: "1"},
73						},
74						Points: []promql.Point{
75							promql.Point{T: 1577836800000, V: 952},
76							promql.Point{T: 1577840400000, V: 1912},
77							promql.Point{T: 1577844000000, V: 2872},
78							promql.Point{T: 1577847600000, V: 3832},
79						},
80					},
81				},
82			},
83		},
84		{
85			name:    "Query max column",
86			query:   `cagg{__column__="max",instance="1"}`,
87			startMs: startTime,
88			endMs:   startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
89			stepMs:  3600 * 1000,
90			res: promql.Result{
91				Value: promql.Matrix{
92					promql.Series{
93						Metric: labels.Labels{
94							labels.Label{Name: "__column__", Value: "max"},
95							labels.Label{Name: "__name__", Value: "cagg"},
96							labels.Label{Name: "__schema__", Value: "cagg_schema"},
97							labels.Label{Name: "foo", Value: "bat"},
98							labels.Label{Name: "instance", Value: "1"},
99						},
100						Points: []promql.Point{
101							promql.Point{T: 1577836800000, V: 952},
102							promql.Point{T: 1577840400000, V: 1912},
103							promql.Point{T: 1577844000000, V: 2872},
104							promql.Point{T: 1577847600000, V: 3832},
105						},
106					},
107				},
108			},
109		},
110		{
111			name:    "Query min column",
112			query:   `cagg{__column__="min",instance="1"}`,
113			startMs: startTime,
114			endMs:   startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
115			stepMs:  3600 * 1000,
116			res: promql.Result{
117				Value: promql.Matrix{
118					promql.Series{
119						Metric: labels.Labels{
120							labels.Label{Name: "__column__", Value: "min"},
121							labels.Label{Name: "__name__", Value: "cagg"},
122							labels.Label{Name: "__schema__", Value: "cagg_schema"},
123							labels.Label{Name: "foo", Value: "bat"},
124							labels.Label{Name: "instance", Value: "1"},
125						},
126						Points: []promql.Point{
127							promql.Point{T: 1577836800000, V: 0},
128							promql.Point{T: 1577840400000, V: 960},
129							promql.Point{T: 1577844000000, V: 1920},
130							promql.Point{T: 1577847600000, V: 2880},
131						},
132					},
133				},
134			},
135		},
136		{
137			name:    "Query avg column",
138			query:   `cagg{__column__="avg",instance="1"}`,
139			startMs: startTime,
140			endMs:   startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value
141			stepMs:  3600 * 1000,
142			res: promql.Result{
143				Value: promql.Matrix{
144					promql.Series{
145						Metric: labels.Labels{
146							labels.Label{Name: "__column__", Value: "avg"},
147							labels.Label{Name: "__name__", Value: "cagg"},
148							labels.Label{Name: "__schema__", Value: "cagg_schema"},
149							labels.Label{Name: "foo", Value: "bat"},
150							labels.Label{Name: "instance", Value: "1"},
151						},
152						Points: []promql.Point{
153							promql.Point{T: 1577836800000, V: 476},
154							promql.Point{T: 1577840400000, V: 1436},
155							promql.Point{T: 1577844000000, V: 2396},
156							promql.Point{T: 1577847600000, V: 3356},
157						},
158					},
159				},
160			},
161		},
162	}
163
164	withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
165		ts := []prompb.TimeSeries{
166			{
167				// This series will be deleted along with it's label once the samples
168				// have been deleted from raw metric and cagg.
169				Labels: []prompb.Label{
170					{Name: pgmodel.MetricNameLabelName, Value: "metric_2"},
171					{Name: "name1", Value: "value1"},
172				},
173				Samples: []prompb.Sample{
174					//this will be dropped (notice the - 1000)
175					{Timestamp: startTime - 1000, Value: 0.1},
176				},
177			},
178		}
179
180		// Ingest test dataset.
181		ingestQueryTestDataset(db, t, append(generateLargeTimeseries(), ts...))
182
183		if _, err := db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()"); err != nil {
184			t.Fatalf("unexpected error while ingesting test dataset: %s", err)
185		}
186		if _, err := db.Exec(context.Background(), "CREATE SCHEMA cagg_schema"); err != nil {
187			t.Fatalf("unexpected error while creating view schema: %s", err)
188		}
189		if *useTimescale2 {
190			if _, err := db.Exec(context.Background(),
191				`CREATE MATERIALIZED VIEW cagg_schema.cagg( time, series_id, value, max, min, avg)
192WITH (timescaledb.continuous) AS
193  SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg
194    FROM prom_data.metric_2
195    GROUP BY time_bucket('1hour', time), series_id`); err != nil {
196				t.Fatalf("unexpected error while creating metric view: %s", err)
197			}
198		} else {
199			// Using TimescaleDB 1.x
200			if _, err := db.Exec(context.Background(),
201				`CREATE VIEW cagg_schema.cagg( time, series_id, value, max, min, avg)
202WITH (timescaledb.continuous,  timescaledb.ignore_invalidation_older_than = '1 min', timescaledb.refresh_lag = '-2h') AS
203  SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg
204    FROM prom_data.metric_2
205    GROUP BY time_bucket('1hour', time), series_id`); err != nil {
206				t.Fatalf("unexpected error while creating metric view: %s", err)
207			}
208			if _, err := db.Exec(context.Background(),
209				`REFRESH MATERIALIZED VIEW cagg_schema.cagg`); err != nil {
210				t.Fatalf("unexpected error while creating metric view: %s", err)
211			}
212		}
213		if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')"); err != nil {
214			t.Fatalf("unexpected error while registering metric view: %s", err)
215		}
216
217		// Getting a read-only connection to ensure read path is idempotent.
218		readOnly := testhelpers.GetReadOnlyConnection(t, *testDatabase)
219		defer readOnly.Close()
220
221		var tester *testing.T
222		var ok bool
223		if tester, ok = t.(*testing.T); !ok {
224			t.Fatalf("Cannot run test, not an instance of testing.T")
225			return
226		}
227
228		mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
229		lCache := clockcache.WithMax(100)
230		dbConn := pgxconn.NewPgxConn(readOnly)
231		labelsReader := lreader.NewLabelsReader(dbConn, lCache)
232		r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
233		queryable := query.NewQueryable(r, labelsReader)
234		queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, []string{})
235		if err != nil {
236			t.Fatal(err)
237		}
238
239		for _, c := range testCases {
240			tc := c
241			tester.Run(c.name, func(t *testing.T) {
242				var qry promql.Query
243				var err error
244
245				if tc.stepMs == 0 {
246					qry, err = queryEngine.NewInstantQuery(queryable, c.query, model.Time(tc.endMs).Time())
247				} else {
248					qry, err = queryEngine.NewRangeQuery(queryable, tc.query, model.Time(tc.startMs).Time(), model.Time(tc.endMs).Time(), time.Duration(tc.stepMs)*time.Millisecond)
249				}
250				if err != nil {
251					t.Fatal(err)
252				}
253
254				res := qry.Exec(context.Background())
255				require.Equal(t, tc.res, *res)
256			})
257		}
258
259		count := 0
260		err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom_data.metric_2`).Scan(&count)
261		if err != nil {
262			t.Error("error fetching count of raw metric timeseries", err)
263		}
264
265		seriesCount := 0
266		err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&seriesCount)
267		if err != nil {
268			t.Error("error fetching series count:", err)
269		}
270
271		// Drop some raw metric data and check that the series data is not marked for deletion.
272		// NOTE: we cannot drop all the raw data becuase we are getting `too far behind` issues with 1.x caggs
273		_, err = db.Exec(context.Background(), "CALL _prom_catalog.drop_metric_chunks($1, $2, $3)", schema.Data, "metric_2", time.Unix(endTime/1000-20000, 0))
274		if err != nil {
275			t.Fatalf("unexpected error while dropping metric chunks: %s", err)
276		}
277
278		afterCount := 0
279		err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom_data.metric_2`).Scan(&afterCount)
280		if err != nil {
281			t.Error("error fetching count of raw metric timeseries", err)
282		}
283		if afterCount >= count {
284			t.Errorf("unexpected row count: got %v, wanted less then %v", afterCount, count)
285		}
286
287		err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&afterCount)
288		if err != nil {
289			t.Error("error fetching series count after drop:", err)
290		}
291
292		// None of the series should be removed.
293		if afterCount != seriesCount {
294			t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount)
295		}
296
297		err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count)
298		if err != nil {
299			t.Error("error fetching series marked for deletion count", err)
300		}
301
302		// None of the series should be marked for deletion.
303		if count != 0 {
304			t.Errorf("unexpected series count: got %v, wanted 0", count)
305		}
306
307		// Count cagg rows before dropping.
308		err = db.QueryRow(context.Background(), `SELECT count(*) FROM cagg_schema.cagg`).Scan(&count)
309		if err != nil {
310			t.Error("error fetching count of raw metric timeseries", err)
311		}
312
313		// Drop all of the cagg data.
314		_, err = db.Exec(context.Background(), "CALL _prom_catalog.drop_metric_chunks($1, $2, $3)", "cagg_schema", "cagg", time.Now())
315		if err != nil {
316			t.Fatalf("unexpected error while dropping cagg chunks: %s", err)
317		}
318
319		err = db.QueryRow(context.Background(), `SELECT count(*) FROM cagg_schema.cagg`).Scan(&afterCount)
320		if err != nil {
321			t.Error("error fetching count of cagg metric timeseries", err)
322		}
323		if afterCount >= count {
324			t.Errorf("unexpected row count: got %v, wanted less then %v", afterCount, count)
325		}
326
327		err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series`).Scan(&afterCount)
328		if err != nil {
329			t.Error("error fetching series count after drop:", err)
330		}
331
332		// None of the series should be removed.
333		if afterCount != seriesCount {
334			t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount)
335		}
336
337		err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count)
338		if err != nil {
339			t.Error("error fetching series marked for deletion count", err)
340		}
341
342		// Now that the raw metric and cagg metric have been deleted, one series that was completely
343		// deleted will be marked for deletion.
344		if count != 1 {
345			t.Errorf("unexpected series count: got %v, wanted 1", count)
346		}
347	})
348}
349
350func TestContinuousAggDataRetention(t *testing.T) {
351	if testing.Short() {
352		t.Skip("skipping integration test")
353	}
354	if !*useTimescaleDB {
355		t.Skip("continuous aggregates need TimescaleDB support")
356	}
357	if *useTimescaleOSS {
358		t.Skip("continuous aggregates need non-OSS version of TimescaleDB")
359	}
360	if *useMultinode {
361		t.Skip("continuous aggregates not supported in multinode TimescaleDB setup")
362	}
363
364	withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
365		dbJob := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_maintenance")
366		defer dbJob.Close()
367		dbSuper, err := pgxpool.Connect(context.Background(), testhelpers.PgConnectURL(*testDatabase, testhelpers.Superuser))
368		require.NoError(t, err)
369		defer dbSuper.Close()
370		//a chunk way back in 2009
371		oldChunk := time.Date(2009, time.November, 11, 0, 0, 0, 0, time.UTC)
372		chunkInRetentionPolicy := time.Now().Add(-100 * 24 * time.Hour)
373
374		ts := []prompb.TimeSeries{
375			{
376				Labels: []prompb.Label{
377					{Name: pgmodel.MetricNameLabelName, Value: "test"},
378					{Name: "name1", Value: "value1"},
379				},
380				Samples: []prompb.Sample{
381					{Timestamp: int64(model.TimeFromUnixNano(oldChunk.UnixNano()) - 1), Value: 0.1},
382					{Timestamp: int64(model.TimeFromUnixNano(chunkInRetentionPolicy.UnixNano()) - 1), Value: 0.1},
383					{Timestamp: int64(model.TimeFromUnixNano(time.Now().UnixNano()) - 1), Value: 0.1},
384				},
385			},
386		}
387		ingestQueryTestDataset(db, t, ts)
388
389		_, err = db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()")
390		require.NoError(t, err)
391		_, err = db.Exec(context.Background(), "CREATE SCHEMA cagg_schema")
392		require.NoError(t, err)
393
394		if *useTimescale2 {
395			_, err = db.Exec(context.Background(),
396				`CREATE MATERIALIZED VIEW cagg_schema.cagg( time, series_id, value, max, min, avg)
397WITH (timescaledb.continuous) AS
398  SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg
399    FROM prom_data.test
400    GROUP BY time_bucket('1hour', time), series_id`)
401			require.NoError(t, err)
402		} else {
403			// Using TimescaleDB 1.x
404			_, err = db.Exec(context.Background(),
405				`CREATE VIEW cagg_schema.cagg( time, series_id, value, max, min, avg)
406WITH (timescaledb.continuous, timescaledb.max_interval_per_job = '1000 weeks', timescaledb.ignore_invalidation_older_than = '1 min') AS
407  SELECT time_bucket('1hour', time), series_id, max(value) as value, max(value) as max, min(value) as min, avg(value) as avg
408    FROM prom_data.test
409    GROUP BY time_bucket('1hour', time), series_id`)
410			require.NoError(t, err)
411			_, err = db.Exec(context.Background(), `REFRESH MATERIALIZED VIEW cagg_schema.cagg`)
412			require.NoError(t, err)
413		}
414		_, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')")
415		require.NoError(t, err)
416
417		_, err = db.Exec(context.Background(), "SELECT prom_api.set_metric_retention_period('cagg_schema', 'cagg', INTERVAL '180 days')")
418		require.NoError(t, err)
419
420		caggHypertable := ""
421		err = db.QueryRow(context.Background(), "SELECT hypertable_relation FROM _prom_catalog.get_storage_hypertable_info('cagg_schema', 'cagg', true)").Scan(&caggHypertable)
422		require.NoError(t, err)
423
424		cnt := 0
425		err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt)
426		require.NoError(t, err)
427		require.Equal(t, 2, int(cnt), "Expected for cagg to have exactly 2 chunks")
428
429		_, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)")
430		require.NoError(t, err)
431
432		err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt)
433		require.NoError(t, err)
434		require.Equal(t, 1, int(cnt), "Expected for cagg to have exactly 1 chunk that is outside of 180 day retention period previously set for this metric")
435
436		_, err = db.Exec(context.Background(), "SELECT prom_api.reset_metric_retention_period('cagg_schema', 'cagg')")
437		require.NoError(t, err)
438
439		_, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)")
440		require.NoError(t, err)
441
442		err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt)
443		require.NoError(t, err)
444		require.Equal(t, 0, int(cnt), "Expected for cagg to have exactly 0 chunks since only data left in default retention period is too new to materialize")
445	})
446}
447
448func TestContinuousAgg2StepAgg(t *testing.T) {
449	if testing.Short() {
450		t.Skip("skipping integration test")
451	}
452	if !*useTimescale2 {
453		t.Skip("2-step continuous aggregates need TimescaleDB 2.x support")
454	}
455	if !*useExtension {
456		t.Skip("2-step continuous aggregates need TimescaleDB 2.x HA image")
457	}
458	if *useTimescaleOSS {
459		t.Skip("continuous aggregates need non-OSS version of TimescaleDB")
460	}
461	if *useMultinode {
462		t.Skip("continuous aggregates not supported in multinode TimescaleDB setup")
463	}
464
465	withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
466		dbJob := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_maintenance")
467		defer dbJob.Close()
468		dbSuper, err := pgxpool.Connect(context.Background(), testhelpers.PgConnectURL(*testDatabase, testhelpers.Superuser))
469		require.NoError(t, err)
470		defer dbSuper.Close()
471		_, err = dbSuper.Exec(context.Background(), "CREATE EXTENSION timescaledb_toolkit")
472		require.NoError(t, err)
473
474		// Ingest test dataset.
475		ingestQueryTestDataset(db, t, generateLargeTimeseries())
476
477		if _, err := db.Exec(context.Background(), "CALL _prom_catalog.finalize_metric_creation()"); err != nil {
478			t.Fatalf("unexpected error while ingesting test dataset: %s", err)
479		}
480		if _, err := db.Exec(context.Background(),
481			`CREATE MATERIALIZED VIEW twa_cagg( time, series_id, tw)
482WITH (timescaledb.continuous) AS
483  SELECT time_bucket('1hour', time), series_id, time_weight('Linear', time, value) as tw
484    FROM prom_data.metric_2
485    GROUP BY time_bucket('1hour', time), series_id`); err != nil {
486			t.Fatalf("unexpected error while creating metric view: %s", err)
487		}
488		if _, err := db.Exec(context.Background(),
489			`CREATE VIEW tw_1hour( time, series_id, value) AS
490  SELECT time, series_id, average(tw) as value
491    FROM twa_cagg`); err != nil {
492			t.Fatalf("unexpected error while creating metric view: %s", err)
493		}
494
495		if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour')"); err != nil {
496			t.Fatalf("unexpected error while registering metric view: %s", err)
497		}
498
499		caggHypertable := ""
500		err = db.QueryRow(context.Background(), "SELECT hypertable_relation FROM _prom_catalog.get_storage_hypertable_info('public', 'tw_1hour', true)").Scan(&caggHypertable)
501		require.NoError(t, err)
502
503		cnt := 0
504		err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt)
505		require.NoError(t, err)
506		require.Greater(t, int(cnt), 0, "Expected for cagg to have at least one chunk")
507
508		_, err = dbJob.Exec(context.Background(), "CALL prom_api.execute_maintenance(log_verbose=>true)")
509		require.NoError(t, err)
510		err = db.QueryRow(context.Background(), fmt.Sprintf(`SELECT count(*) FROM public.show_chunks('%s', older_than => NOW())`, caggHypertable)).Scan(&cnt)
511		require.NoError(t, err)
512		require.Equal(t, 0, int(cnt), "Expected for cagg to have no chunks, all outside of data retention period")
513	})
514}
515