1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"fmt"
9	"testing"
10	"time"
11
12	"github.com/jackc/pgtype"
13	"github.com/pkg/errors"
14	"github.com/prometheus/prometheus/pkg/labels"
15	"github.com/stretchr/testify/require"
16	"github.com/timescale/promscale/pkg/pgmodel/cache"
17	pgmodelErrs "github.com/timescale/promscale/pkg/pgmodel/common/errors"
18	"github.com/timescale/promscale/pkg/pgmodel/model"
19	pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
20	"github.com/timescale/promscale/pkg/pgmodel/model/pgutf8str"
21	"github.com/timescale/promscale/pkg/prompb"
22	tput "github.com/timescale/promscale/pkg/util/throughput"
23)
24
25func getTestLabelArray(t *testing.T, l [][]int32) *pgtype.ArrayType {
26	model.SetLabelArrayOIDForTest(0)
27	labelArrayArray := model.GetCustomType(model.LabelArray)
28	err := labelArrayArray.Set(l)
29	require.NoError(t, err)
30	return labelArrayArray
31}
32
33func init() {
34	tput.InitWatcher(time.Second)
35}
36
37type sVisitor []model.Insertable
38
39func (c sVisitor) VisitSeries(cb func(s *pgmodel.Series) error) error {
40	for _, insertable := range c {
41		err := cb(insertable.Series())
42		if err != nil {
43			return err
44		}
45	}
46	return nil
47}
48
49func TestPGXInserterInsertSeries(t *testing.T) {
50	testCases := []struct {
51		name       string
52		series     []labels.Labels
53		sqlQueries []model.SqlQuery
54	}{
55		{
56			name: "Zero series",
57		},
58		{
59			name: "One series",
60			series: []labels.Labels{
61				{
62					{Name: "name_1", Value: "value_1"},
63					{Name: "__name__", Value: "metric_1"},
64				},
65			},
66
67			sqlQueries: []model.SqlQuery{
68				{Sql: "BEGIN;"},
69				{
70					Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
71					Args:    []interface{}(nil),
72					Results: model.RowResults{{int64(1)}},
73					Err:     error(nil),
74				},
75				{Sql: "COMMIT;"},
76				{Sql: "BEGIN;"},
77				{
78					Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
79					Args: []interface{}{
80						"metric_1",
81						[]string{"__name__", "name_1"},
82						[]string{"metric_1", "value_1"},
83					},
84					Results: model.RowResults{
85						{[]int32{1, 2}, []int32{1, 2}, []string{"__name__", "name_1"}, []string{"metric_1", "value_1"}},
86					},
87					Err: error(nil),
88				},
89				{Sql: "COMMIT;"},
90				{Sql: "BEGIN;"},
91				{
92					Sql: seriesInsertSQL,
93					Args: []interface{}{
94						"metric_1",
95						getTestLabelArray(t, [][]int32{{1, 2}}),
96					},
97					Results: model.RowResults{{int64(1), int64(1)}},
98					Err:     error(nil),
99				},
100				{Sql: "COMMIT;"},
101			},
102		},
103		{
104			name: "Two series",
105			series: []labels.Labels{
106				{
107					{Name: "name_1", Value: "value_1"},
108					{Name: "__name__", Value: "metric_1"},
109				},
110				{
111					{Name: "name_2", Value: "value_2"},
112					{Name: "__name__", Value: "metric_1"},
113				},
114			},
115			sqlQueries: []model.SqlQuery{
116				{Sql: "BEGIN;"},
117				{
118					Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
119					Args:    []interface{}(nil),
120					Results: model.RowResults{{int64(1)}},
121					Err:     error(nil),
122				},
123				{Sql: "COMMIT;"},
124				{Sql: "BEGIN;"},
125				{
126					Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
127					Args: []interface{}{
128						"metric_1",
129						[]string{"__name__", "name_1", "name_2"},
130						[]string{"metric_1", "value_1", "value_2"},
131					},
132					Results: model.RowResults{
133						{[]int32{1, 2, 3}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_2"}, []string{"metric_1", "value_1", "value_2"}},
134					},
135					Err: error(nil),
136				},
137				{Sql: "COMMIT;"},
138				{Sql: "BEGIN;"},
139				{
140					Sql: seriesInsertSQL,
141					Args: []interface{}{
142						"metric_1",
143						getTestLabelArray(t, [][]int32{{1, 2}, {1, 0, 3}}),
144					},
145					Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}},
146					Err:     error(nil),
147				},
148				{Sql: "COMMIT;"},
149			},
150		},
151		{
152			name: "Double series",
153			series: []labels.Labels{
154				{
155					{Name: "name_1", Value: "value_1"},
156					{Name: "__name__", Value: "metric_1"}},
157				{
158					{Name: "name_2", Value: "value_2"},
159					{Name: "__name__", Value: "metric_1"}},
160				{
161					{Name: "name_1", Value: "value_1"},
162					{Name: "__name__", Value: "metric_1"},
163				},
164			},
165			sqlQueries: []model.SqlQuery{
166				{Sql: "BEGIN;"},
167				{
168					Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
169					Args:    []interface{}(nil),
170					Results: model.RowResults{{int64(1)}},
171					Err:     error(nil),
172				},
173				{Sql: "COMMIT;"},
174				{Sql: "BEGIN;"},
175				{
176					Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
177					Args: []interface{}{
178						"metric_1",
179						[]string{"__name__", "name_1", "name_2"},
180						[]string{"metric_1", "value_1", "value_2"},
181					},
182					Results: model.RowResults{
183						{[]int32{1, 2, 3}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_2"}, []string{"metric_1", "value_1", "value_2"}},
184					},
185					Err: error(nil),
186				},
187				{Sql: "COMMIT;"},
188				{Sql: "BEGIN;"},
189				{
190					Sql: seriesInsertSQL,
191					Args: []interface{}{
192						"metric_1",
193						getTestLabelArray(t, [][]int32{{1, 2}, {1, 0, 3}, {1, 2}}),
194					},
195					Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}, {int64(1), int64(1)}},
196					Err:     error(nil),
197				},
198				{Sql: "COMMIT;"},
199			},
200		},
201		{
202			name: "Query err",
203			series: []labels.Labels{
204				{
205					{Name: "name_1", Value: "value_1"},
206					{Name: "__name__", Value: "metric_1"}},
207				{
208					{Name: "name_2", Value: "value_2"},
209					{Name: "__name__", Value: "metric_1"},
210				},
211			},
212			sqlQueries: []model.SqlQuery{
213				{Sql: "BEGIN;"},
214				{
215					Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
216					Args:    []interface{}(nil),
217					Results: model.RowResults{{int64(1)}},
218					Err:     error(nil),
219				},
220				{Sql: "COMMIT;"},
221				{Sql: "BEGIN;"},
222				{
223					Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
224					Args: []interface{}{
225						"metric_1",
226						[]string{"__name__", "name_1", "name_2"},
227						[]string{"metric_1", "value_1", "value_2"},
228					},
229					Results: model.RowResults{
230						{int32(1), int32(1), "__name__", "metric_1"},
231						{int32(1), int32(2), "__name__", "metric_2"},
232						{int32(2), int32(3), "name_1", "value_1"},
233						{int32(3), int32(4), "name_2", "value_2"},
234					},
235					Err: fmt.Errorf("some query error"),
236				},
237				{Sql: "COMMIT;"},
238			},
239		},
240	}
241
242	for _, c := range testCases {
243		t.Run(c.name, func(t *testing.T) {
244			for i := range c.sqlQueries {
245				for j := range c.sqlQueries[i].Args {
246					if _, ok := c.sqlQueries[i].Args[j].([]string); ok {
247						tmp := &pgutf8str.TextArray{}
248						err := tmp.Set(c.sqlQueries[i].Args[j])
249						require.NoError(t, err)
250						c.sqlQueries[i].Args[j] = tmp
251					}
252				}
253			}
254			mock := model.NewSqlRecorder(c.sqlQueries, t)
255			scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
256			scache.Reset()
257
258			sw := NewSeriesWriter(mock, 0)
259
260			lsi := make([]model.Insertable, 0)
261			for _, ser := range c.series {
262				ls, err := scache.GetSeriesFromLabels(ser)
263				if err != nil {
264					t.Errorf("invalid labels %+v, %v", ls, err)
265				}
266				lsi = append(lsi, model.NewPromExemplars(ls, nil))
267			}
268
269			err := sw.WriteSeries(sVisitor(lsi))
270			if err != nil {
271				foundErr := false
272				for _, q := range c.sqlQueries {
273					if q.Err != nil {
274						foundErr = true
275						if !errors.Is(err, q.Err) {
276							t.Errorf("unexpected query error:\ngot\n%s\nwanted\n%s", err, q.Err)
277						}
278					}
279				}
280				if !foundErr {
281					t.Errorf("unexpected error: %v", err)
282				}
283			}
284
285			if err == nil {
286				for _, si := range lsi {
287					si, se, err := si.Series().GetSeriesID()
288					require.NoError(t, err)
289					require.True(t, si > 0, "series id not set")
290					require.True(t, se > 0, "epoch not set")
291				}
292			}
293		})
294	}
295}
296
297func TestPGXInserterCacheReset(t *testing.T) {
298	series := []labels.Labels{
299		{
300			{Name: "__name__", Value: "metric_1"},
301			{Name: "name_1", Value: "value_1"},
302		},
303		{
304			{Name: "name_1", Value: "value_2"},
305			{Name: "__name__", Value: "metric_1"},
306		},
307	}
308
309	sqlQueries := []model.SqlQuery{
310
311		// first series cache fetch
312		{Sql: "BEGIN;"},
313		{
314			Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
315			Args:    []interface{}(nil),
316			Results: model.RowResults{{int64(1)}},
317			Err:     error(nil),
318		},
319		{Sql: "COMMIT;"},
320		{Sql: "BEGIN;"},
321		{
322			Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
323			Args: []interface{}{
324				"metric_1",
325				[]string{"__name__", "name_1", "name_1"},
326				[]string{"metric_1", "value_1", "value_2"},
327			},
328			Results: model.RowResults{
329				{[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}},
330			},
331			Err: error(nil),
332		},
333		{Sql: "COMMIT;"},
334		{Sql: "BEGIN;"},
335		{
336			Sql: seriesInsertSQL,
337			Args: []interface{}{
338				"metric_1",
339				getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}),
340			},
341			Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}},
342			Err:     error(nil),
343		},
344		{Sql: "COMMIT;"},
345
346		// first labels cache refresh, does not trash
347		{
348			Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
349			Args:    []interface{}(nil),
350			Results: model.RowResults{{int64(1)}},
351			Err:     error(nil),
352		},
353
354		// second labels cache refresh, trash the cache
355		{
356			Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
357			Args:    []interface{}(nil),
358			Results: model.RowResults{{int64(2)}},
359			Err:     error(nil),
360		},
361		{Sql: "BEGIN;"},
362
363		// repopulate the cache
364		{
365			Sql:     "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
366			Args:    []interface{}(nil),
367			Results: model.RowResults{{int64(2)}},
368			Err:     error(nil),
369		},
370		{Sql: "COMMIT;"},
371		{Sql: "BEGIN;"},
372		{
373			Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)",
374			Args: []interface{}{
375				"metric_1",
376				[]string{"__name__", "name_1", "name_1"},
377				[]string{"metric_1", "value_1", "value_2"},
378			},
379			Results: model.RowResults{
380				{[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}},
381			},
382			Err: error(nil),
383		},
384		{Sql: "COMMIT;"},
385		{Sql: "BEGIN;"},
386		{
387			Sql: seriesInsertSQL,
388			Args: []interface{}{
389				"metric_1",
390				getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}),
391			},
392			Results: model.RowResults{{int64(3), int64(1)}, {int64(4), int64(2)}},
393			Err:     error(nil),
394		},
395		{Sql: "COMMIT;"},
396	}
397
398	for i := range sqlQueries {
399		for j := range sqlQueries[i].Args {
400			if _, ok := sqlQueries[i].Args[j].([]string); ok {
401				tmp := &pgutf8str.TextArray{}
402				err := tmp.Set(sqlQueries[i].Args[j])
403				require.NoError(t, err)
404				sqlQueries[i].Args[j] = tmp
405			}
406		}
407	}
408
409	mock := model.NewSqlRecorder(sqlQueries, t)
410	scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
411
412	sw := NewSeriesWriter(mock, 0)
413	inserter := pgxDispatcher{
414		conn:   mock,
415		scache: scache,
416	}
417
418	makeSamples := func(series []labels.Labels) []model.Insertable {
419		lsi := make([]model.Insertable, 0)
420		for _, ser := range series {
421			ls, err := scache.GetSeriesFromLabels(ser)
422			if err != nil {
423				t.Errorf("invalid labels %+v, %v", ls, err)
424			}
425			lsi = append(lsi, model.NewPromSamples(ls, nil))
426		}
427		return lsi
428	}
429
430	samples := makeSamples(series)
431	err := sw.WriteSeries(sVisitor(samples))
432	if err != nil {
433		t.Fatal(err)
434	}
435
436	expectedIds := []model.SeriesID{
437		model.SeriesID(1),
438		model.SeriesID(2),
439	}
440
441	for index, si := range samples {
442		_, _, ok := si.Series().NameValues()
443		require.False(t, ok)
444		expectedId := expectedIds[index]
445		gotId, _, err := si.Series().GetSeriesID()
446		require.NoError(t, err)
447		if gotId != expectedId {
448			t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId)
449		}
450	}
451
452	// refreshing during the same epoch givesthe same IDs without checking the DB
453	_, err = inserter.refreshSeriesEpoch(1)
454	require.NoError(t, err)
455
456	samples = makeSamples(series)
457	err = sw.WriteSeries(sVisitor(samples))
458	if err != nil {
459		t.Fatal(err)
460	}
461
462	for index, si := range samples {
463		_, _, ok := si.Series().NameValues()
464		require.False(t, ok)
465		expectedId := expectedIds[index]
466		gotId, _, err := si.Series().GetSeriesID()
467		require.NoError(t, err)
468		if gotId != expectedId {
469			t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId)
470		}
471	}
472
473	// trash the cache
474	_, err = inserter.refreshSeriesEpoch(1)
475	require.NoError(t, err)
476
477	// retrying rechecks the DB and uses the new IDs
478	samples = makeSamples(series)
479	err = sw.WriteSeries(sVisitor(samples))
480	if err != nil {
481		t.Fatal(err)
482	}
483
484	expectedIds = []model.SeriesID{
485		model.SeriesID(3),
486		model.SeriesID(4),
487	}
488
489	for index, si := range samples {
490		_, _, ok := si.Series().NameValues()
491		require.False(t, ok)
492		expectedId := expectedIds[index]
493		gotId, _, err := si.Series().GetSeriesID()
494		require.NoError(t, err)
495		if gotId != expectedId {
496			t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId)
497		}
498	}
499}
500
501func TestPGXInserterInsertData(t *testing.T) {
502	makeLabel := func() *model.Series {
503		l := &model.Series{}
504		l.SetSeriesID(1, 1)
505		return l
506	}
507
508	testCases := []struct {
509		name          string
510		rows          map[string][]model.Insertable
511		sqlQueries    []model.SqlQuery
512		metricsGetErr error
513	}{
514		{
515			name: "Zero data",
516			sqlQueries: []model.SqlQuery{
517				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
518				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
519				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
520			},
521		},
522		{
523			name: "One data",
524			rows: map[string][]model.Insertable{
525				"metric_0": {model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1))},
526			},
527			sqlQueries: []model.SqlQuery{
528				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
529				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
530				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
531				{
532					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
533					Args:    []interface{}{"metric_0"},
534					Results: model.RowResults{{"metric_0", false}},
535					Err:     error(nil),
536				},
537				{
538					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
539					Args: []interface{}{
540						"metric_0",
541						[]time.Time{time.Unix(0, 0)},
542						[]float64{0},
543						[]int64{1},
544					},
545					Results: model.RowResults{{int64(1)}},
546					Err:     error(nil),
547				},
548				{
549					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
550					Args:    []interface{}{int64(1)},
551					Results: model.RowResults{{[]byte{}}},
552					Err:     error(nil),
553				},
554			},
555		},
556		{
557			name: "Two data",
558			rows: map[string][]model.Insertable{
559				"metric_0": {
560					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
561					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
562				},
563			},
564			sqlQueries: []model.SqlQuery{
565				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
566				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
567				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
568				{
569					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
570					Args:    []interface{}{"metric_0"},
571					Results: model.RowResults{{"metric_0", false}},
572					Err:     error(nil),
573				},
574
575				{
576					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
577					Args: []interface{}{
578						"metric_0",
579						[]time.Time{time.Unix(0, 0), time.Unix(0, 0)},
580						[]float64{0, 0},
581						[]int64{1, 1},
582					},
583					Results: model.RowResults{{int64(1)}},
584					Err:     error(nil),
585				},
586				{
587					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
588					Args:    []interface{}{int64(1)},
589					Results: model.RowResults{{[]byte{}}},
590					Err:     error(nil),
591				},
592			},
593		},
594		{
595			name: "Create table error",
596			rows: map[string][]model.Insertable{
597				"metric_0": {
598					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
599					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
600					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
601					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
602					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
603				},
604			},
605			sqlQueries: []model.SqlQuery{
606				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
607				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
608				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
609				{
610					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
611					Args:    []interface{}{"metric_0"},
612					Results: model.RowResults{{"metric_0", false}},
613					Err:     fmt.Errorf("create table error"),
614				},
615			},
616		},
617		{
618			name: "Epoch Error",
619			rows: map[string][]model.Insertable{
620				"metric_0": {
621					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
622				},
623			},
624			sqlQueries: []model.SqlQuery{
625				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
626				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
627				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
628				{
629					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
630					Args:    []interface{}{"metric_0"},
631					Results: model.RowResults{{"metric_0", false}},
632					Err:     error(nil),
633				},
634
635				{
636					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
637					Args: []interface{}{
638						"metric_0",
639						[]time.Time{time.Unix(0, 0)},
640						[]float64{0},
641						[]int64{1},
642					},
643					Results: model.RowResults{{int64(1)}},
644					Err:     error(nil),
645				},
646				{
647					//this is the attempt on the full batch
648					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
649					Args:    []interface{}{int64(1)},
650					Results: model.RowResults{{[]byte{}}},
651					Err:     fmt.Errorf("epoch error"),
652				},
653
654				{
655					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
656					Args: []interface{}{
657						"metric_0",
658						[]time.Time{time.Unix(0, 0)},
659						[]float64{0},
660						[]int64{1},
661					},
662					Results: model.RowResults{{int64(1)}},
663					Err:     error(nil),
664				},
665				{
666					//this is the attempt on the individual copyRequests
667					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
668					Args:    []interface{}{int64(1)},
669					Results: model.RowResults{{[]byte{}}},
670					Err:     fmt.Errorf("epoch error"),
671				},
672			},
673		},
674		{
675			name: "Copy from error",
676			rows: map[string][]model.Insertable{
677				"metric_0": {
678					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
679					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
680					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
681					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
682					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
683				},
684			},
685
686			sqlQueries: []model.SqlQuery{
687				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
688				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
689				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
690				{
691					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
692					Args:    []interface{}{"metric_0"},
693					Results: model.RowResults{{"metric_0", false}},
694					Err:     error(nil),
695				},
696
697				{
698					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
699					Args: []interface{}{
700						"metric_0",
701						[]time.Time{time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0)},
702						make([]float64, 5),
703						[]int64{1, 1, 1, 1, 1},
704					},
705					Results: model.RowResults{{int64(1)}},
706					Err:     fmt.Errorf("some INSERT error"),
707				},
708				{
709					// this is the entire batch insert
710					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
711					Args:    []interface{}{int64(1)},
712					Results: model.RowResults{{[]byte{}}},
713					Err:     error(nil),
714				},
715
716				{
717					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
718					Args: []interface{}{
719						"metric_0",
720						[]time.Time{time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0)},
721						make([]float64, 5),
722						[]int64{1, 1, 1, 1, 1},
723					},
724					Results: model.RowResults{{int64(1)}},
725					Err:     fmt.Errorf("some INSERT error"),
726				},
727				{
728					// this is the retry on individual copy requests
729					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
730					Args:    []interface{}{int64(1)},
731					Results: model.RowResults{{[]byte{}}},
732					Err:     error(nil),
733				},
734			},
735		},
736		{
737			name: "Can't find/create table in DB",
738			rows: map[string][]model.Insertable{
739				"metric_0": {
740					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
741					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
742					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
743					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
744					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
745				},
746			},
747			sqlQueries: []model.SqlQuery{
748				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
749				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
750				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
751				{
752					Sql:  "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
753					Args: []interface{}{"metric_0"},
754					// no results is deliberate
755					Results: model.RowResults{},
756					Err:     error(nil),
757				},
758			},
759		},
760		{
761			//cache errors get recovered from and the insert succeeds
762			name: "Metrics cache get error",
763			rows: map[string][]model.Insertable{
764				"metric_0": {
765					model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)),
766				},
767			},
768			metricsGetErr: fmt.Errorf("some metrics error"),
769			sqlQueries: []model.SqlQuery{
770				{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
771				{Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}},
772				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
773				{
774					Sql:     "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
775					Args:    []interface{}{"metric_0"},
776					Results: model.RowResults{{"metric_0", true}},
777					Err:     error(nil),
778				},
779				{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
780				{
781					Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
782					Args: []interface{}{
783						"metric_0",
784						[]time.Time{time.Unix(0, 0)},
785						[]float64{0},
786						[]int64{1},
787					},
788					Results: model.RowResults{{int64(1)}},
789					Err:     error(nil),
790				},
791				{
792					Sql:     "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
793					Args:    []interface{}{int64(1)},
794					Results: model.RowResults{{[]byte{}}},
795					Err:     error(nil),
796				},
797			},
798		},
799	}
800
801	for _, co := range testCases {
802		c := co
803		t.Run(c.name, func(t *testing.T) {
804			mock := model.NewSqlRecorder(c.sqlQueries, t)
805			scache := cache.NewSeriesCache(cache.DefaultConfig, nil)
806
807			mockMetrics := &model.MockMetricCache{
808				MetricCache:  make(map[string]model.MetricInfo),
809				GetMetricErr: c.metricsGetErr,
810			}
811			err := mockMetrics.Set(
812				"prom_data",
813				"metric_1",
814				model.MetricInfo{
815					TableSchema: "prom_data",
816					TableName:   "metricTableName_1",
817					SeriesTable: "metric_1",
818				}, false)
819			if err != nil {
820				t.Fatalf("error setting up mock cache: %s", err.Error())
821			}
822			inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true})
823			if err != nil {
824				t.Fatal(err)
825			}
826			defer inserter.Close()
827
828			_, err = inserter.InsertTs(model.Data{Rows: c.rows})
829
830			var expErr error
831			switch {
832			case c.metricsGetErr != nil:
833				//cache errors recover
834				expErr = nil
835			case c.name == "Can't find/create table in DB":
836				expErr = pgmodelErrs.ErrMissingTableName
837			default:
838				for _, q := range c.sqlQueries {
839					if q.Err != nil {
840						expErr = q.Err
841					}
842				}
843			}
844
845			if err != nil {
846				if !errors.Is(err, expErr) {
847					t.Errorf("unexpected error:\ngot\n%s\nwanted\n%s", err, expErr)
848				}
849
850				return
851			}
852
853			if expErr != nil {
854				t.Errorf("expected error:\ngot\nnil\nwanted\n%s", expErr)
855			}
856
857			if len(c.rows) == 0 {
858				return
859			}
860		})
861	}
862}
863