1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package end_to_end_tests 6 7import ( 8 "context" 9 "testing" 10 11 "github.com/jackc/pgx/v4/pgxpool" 12 "github.com/stretchr/testify/require" 13 ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor" 14 "github.com/timescale/promscale/pkg/pgmodel/model" 15 "github.com/timescale/promscale/pkg/pgxconn" 16 "github.com/timescale/promscale/pkg/prompb" 17) 18 19func TestInsertInCompressedChunks(t *testing.T) { 20 if *useTimescaleOSS { 21 t.Skip("compression not applicable in TimescaleDB-OSS") 22 } 23 ts := generateSmallTimeseries() 24 if !*useTimescaleDB { 25 // Ingest in plain postgres to ensure everything works well even if TimescaleDB is not installed. 26 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 27 ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) 28 require.NoError(t, err) 29 defer ingestor.Close() 30 _, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) 31 require.NoError(t, err) 32 r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") 33 require.NoError(t, err) 34 defer r.Close() 35 36 count := 0 37 for r.Next() { 38 count++ 39 } 40 require.Equal(t, 5, count) 41 }) 42 return 43 } 44 45 sample := []prompb.TimeSeries{ 46 { 47 Labels: []prompb.Label{ 48 {Name: model.MetricNameLabelName, Value: "firstMetric"}, 49 {Name: "foo", Value: "bar"}, 50 {Name: "common", Value: "tag"}, 51 {Name: "empty", Value: ""}, 52 }, 53 Samples: []prompb.Sample{ 54 {Timestamp: 7, Value: 0.7}, 55 }, 56 }, 57 } 58 // With decompress chunks being true. 59 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 60 ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) 61 require.NoError(t, err) 62 defer ingestor.Close() 63 _, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) 64 require.NoError(t, err) 65 err = ingestor.CompleteMetricCreation() 66 if err != nil { 67 t.Fatal(err) 68 } 69 _, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;") 70 require.NoError(t, err) 71 72 // Insert data into compressed chunk. 73 _, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample))) 74 require.NoError(t, err) 75 76 r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") 77 require.NoError(t, err) 78 defer r.Close() 79 80 count := 0 81 for r.Next() { 82 count++ 83 } 84 require.Equal(t, 6, count) 85 }) 86 87 // With decompress chunks being false. 88 withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { 89 ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{IgnoreCompressedChunks: true}) 90 require.NoError(t, err) 91 defer ingestor.Close() 92 _, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) 93 require.NoError(t, err) 94 err = ingestor.CompleteMetricCreation() 95 if err != nil { 96 t.Fatal(err) 97 } 98 _, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;") 99 require.NoError(t, err) 100 101 // Insert data into compressed chunk. 102 _, _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample))) 103 require.NoError(t, err) 104 105 r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") 106 require.NoError(t, err) 107 defer r.Close() 108 109 count := 0 110 for r.Next() { 111 count++ 112 } 113 require.Equal(t, 5, count) // The recent sample did not get ingested. This is because the chunks were compressed and we were asked to not ingest into compressed chunks. 114 }) 115} 116