1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package end_to_end_tests
6
7import (
8	"context"
9	"testing"
10
11	"github.com/jackc/pgx/v4/pgxpool"
12	"github.com/stretchr/testify/require"
13	ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor"
14	"github.com/timescale/promscale/pkg/pgmodel/model"
15	"github.com/timescale/promscale/pkg/pgxconn"
16	"github.com/timescale/promscale/pkg/prompb"
17)
18
19func TestInsertInCompressedChunks(t *testing.T) {
20	if *useTimescaleOSS {
21		t.Skip("compression not applicable in TimescaleDB-OSS")
22	}
23	ts := generateSmallTimeseries()
24	if !*useTimescaleDB {
25		// Ingest in plain postgres to ensure everything works well even if TimescaleDB is not installed.
26		withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
27			ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
28			require.NoError(t, err)
29			defer ingestor.Close()
30			_, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts)))
31			require.NoError(t, err)
32			r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";")
33			require.NoError(t, err)
34			defer r.Close()
35
36			count := 0
37			for r.Next() {
38				count++
39			}
40			require.Equal(t, 5, count)
41		})
42		return
43	}
44
45	sample := []prompb.TimeSeries{
46		{
47			Labels: []prompb.Label{
48				{Name: model.MetricNameLabelName, Value: "firstMetric"},
49				{Name: "foo", Value: "bar"},
50				{Name: "common", Value: "tag"},
51				{Name: "empty", Value: ""},
52			},
53			Samples: []prompb.Sample{
54				{Timestamp: 7, Value: 0.7},
55			},
56		},
57	}
58	// With decompress chunks being true.
59	withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
60		ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
61		require.NoError(t, err)
62		defer ingestor.Close()
63		_, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts)))
64		require.NoError(t, err)
65		err = ingestor.CompleteMetricCreation()
66		if err != nil {
67			t.Fatal(err)
68		}
69		_, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;")
70		require.NoError(t, err)
71
72		// Insert data into compressed chunk.
73		_, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample)))
74		require.NoError(t, err)
75
76		r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";")
77		require.NoError(t, err)
78		defer r.Close()
79
80		count := 0
81		for r.Next() {
82			count++
83		}
84		require.Equal(t, 6, count)
85	})
86
87	// With decompress chunks being false.
88	withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
89		ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{IgnoreCompressedChunks: true})
90		require.NoError(t, err)
91		defer ingestor.Close()
92		_, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts)))
93		require.NoError(t, err)
94		err = ingestor.CompleteMetricCreation()
95		if err != nil {
96			t.Fatal(err)
97		}
98		_, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;")
99		require.NoError(t, err)
100
101		// Insert data into compressed chunk.
102		_, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample)))
103		require.NoError(t, err)
104
105		r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";")
106		require.NoError(t, err)
107		defer r.Close()
108
109		count := 0
110		for r.Next() {
111			count++
112		}
113		require.Equal(t, 5, count) // The recent sample did not get ingested. This is because the chunks were compressed and we were asked to not ingest into compressed chunks.
114	})
115}
116