1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"context"
9	"fmt"
10	"math"
11	"sort"
12	"strings"
13	"sync"
14	"time"
15
16	"github.com/jackc/pgconn"
17	"github.com/prometheus/common/model"
18	"github.com/timescale/promscale/pkg/log"
19	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
20	"github.com/timescale/promscale/pkg/pgmodel/metrics"
21	pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
22	"github.com/timescale/promscale/pkg/pgxconn"
23)
24
25const maxInsertStmtPerTxn = 100
26
27type copyRequest struct {
28	data  *pendingBuffer
29	table string
30}
31
32var (
33	getBatchMutex       = &sync.Mutex{}
34	handleDecompression = retryAfterDecompression
35)
36
37type copyBatch []copyRequest
38
39func (reqs copyBatch) VisitSeries(callBack func(s *pgmodel.Series) error) error {
40	for _, req := range reqs {
41		insertables := req.data.batch.Data()
42		for i := range insertables {
43			if err := callBack(insertables[i].Series()); err != nil {
44				return err
45			}
46		}
47	}
48	return nil
49}
50
51func (reqs copyBatch) VisitExemplar(callBack func(s *pgmodel.PromExemplars) error) error {
52	for _, req := range reqs {
53		insertables := req.data.batch.Data()
54		for i := range insertables {
55			exemplar, ok := insertables[i].(*pgmodel.PromExemplars)
56			if ok {
57				if err := callBack(exemplar); err != nil {
58					return err
59				}
60			}
61		}
62	}
63	return nil
64}
65
66// Handles actual insertion into the DB.
67// We have one of these per connection reserved for insertion.
68func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf *ExemplarLabelFormatter) {
69	requestBatch := make([]readRequest, 0, maxInsertStmtPerTxn)
70	insertBatch := make([]copyRequest, 0, cap(requestBatch))
71	for {
72		var ok bool
73
74		//fetch the batch of request upfront to make sure all of the
75		//requests fetched are for unique metrics. This is insured
76		//by the fact that the batcher only has one outstanding readRequest
77		//at a time and the fact that we fetch the entire batch before
78		//executing any of the reads. This guarantees that we never
79		//need to batch the same metrics together in the copier
80		requestBatch, ok = copierGetBatch(requestBatch, in)
81		if !ok {
82			return
83		}
84
85		for i := range requestBatch {
86			copyRequest, ok := <-requestBatch[i].copySender
87			if !ok {
88				continue
89			}
90			insertBatch = append(insertBatch, copyRequest)
91		}
92
93		// sort to prevent deadlocks on table locks
94		sort.Slice(insertBatch, func(i, j int) bool {
95			return insertBatch[i].table < insertBatch[j].table
96		})
97
98		err := persistBatch(conn, sw, elf, insertBatch)
99		if err != nil {
100			for i := range insertBatch {
101				insertBatch[i].data.reportResults(err)
102				insertBatch[i].data.release()
103			}
104		}
105		for i := range requestBatch {
106			requestBatch[i] = readRequest{}
107		}
108		for i := range insertBatch {
109			insertBatch[i] = copyRequest{}
110		}
111		insertBatch = insertBatch[:0]
112		requestBatch = requestBatch[:0]
113	}
114}
115
116func persistBatch(conn pgxconn.PgxConn, sw *seriesWriter, elf *ExemplarLabelFormatter, insertBatch []copyRequest) error {
117	batch := copyBatch(insertBatch)
118	err := sw.WriteSeries(batch)
119	if err != nil {
120		return fmt.Errorf("copier: writing series: %w", err)
121	}
122	err = elf.orderExemplarLabelValues(batch)
123	if err != nil {
124		return fmt.Errorf("copier: formatting exemplar label values: %w", err)
125	}
126
127	doInsertOrFallback(conn, insertBatch...)
128	return nil
129}
130
131func copierGetBatch(batch []readRequest, in <-chan readRequest) ([]readRequest, bool) {
132	//This mutex is not for safety, but rather for better batching.
133	//It guarantees that only one copier is reading from the channel at one time
134	//This ensures bigger batches as well as less spread of a
135	//single http request to multiple copiers, decreasing latency via less sync
136	//overhead especially on low-pressure systems.
137	getBatchMutex.Lock()
138	defer getBatchMutex.Unlock()
139	req, ok := <-in
140	if !ok {
141		return batch, false
142	}
143	batch = append(batch, req)
144
145	//we use a small timeout to prevent low-pressure systems from using up too many
146	//txns and putting pressure on system
147	timeout := time.After(20 * time.Millisecond)
148hot_gather:
149	for len(batch) < cap(batch) {
150		select {
151		case r2 := <-in:
152			batch = append(batch, r2)
153		case <-timeout:
154			break hot_gather
155		}
156	}
157	return batch, true
158}
159
160func doInsertOrFallback(conn pgxconn.PgxConn, reqs ...copyRequest) {
161	err, _ := insertSeries(conn, reqs...)
162	if err != nil {
163		insertBatchErrorFallback(conn, reqs...)
164		return
165	}
166
167	for i := range reqs {
168		reqs[i].data.reportResults(nil)
169		reqs[i].data.release()
170	}
171}
172
173func insertBatchErrorFallback(conn pgxconn.PgxConn, reqs ...copyRequest) {
174	for i := range reqs {
175		err, minTime := insertSeries(conn, reqs[i])
176		if err != nil {
177			err = tryRecovery(conn, err, reqs[i], minTime)
178		}
179
180		reqs[i].data.reportResults(err)
181		reqs[i].data.release()
182	}
183}
184
185// we can currently recover from one error:
186// If we inserted into a compressed chunk, we decompress the chunk and try again.
187// Since a single batch can have both errors, we need to remember the insert method
188// we're using, so that we deduplicate if needed.
189func tryRecovery(conn pgxconn.PgxConn, err error, req copyRequest, minTime int64) error {
190	// we only recover from postgres errors right now
191	pgErr, ok := err.(*pgconn.PgError)
192	if !ok {
193		errMsg := err.Error()
194		log.Warn("msg", fmt.Sprintf("unexpected error while inserting to %s", req.table), "err", errMsg)
195		return err
196	}
197
198	if pgErr.Code == "0A000" || strings.Contains(pgErr.Message, "compressed") || strings.Contains(pgErr.Message, "insert/update/delete not permitted") {
199		// If the error was that the table is already compressed, decompress and try again.
200		return handleDecompression(conn, req, minTime)
201	}
202
203	log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "err", pgErr.Error())
204	return pgErr
205}
206
207func skipDecompression(_ pgxconn.PgxConn, _ copyRequest, _ int64) error {
208	log.WarnRateLimited("msg", "Rejecting samples falling on compressed chunks as decompression is disabled")
209	return nil
210}
211
212// In the event we filling in old data and the chunk we want to INSERT into has
213// already been compressed, we decompress the chunk and try again. When we do
214// this we delay the recompression to give us time to insert additional data.
215func retryAfterDecompression(conn pgxconn.PgxConn, req copyRequest, minTimeInt int64) error {
216	var (
217		table   = req.table
218		minTime = model.Time(minTimeInt).Time()
219	)
220	//how much faster are we at ingestion than wall-clock time?
221	ingestSpeedup := 2
222	//delay the next compression job proportional to the duration between now and the data time + a constant safety
223	delayBy := (time.Since(minTime) / time.Duration(ingestSpeedup)) + 60*time.Minute
224	maxDelayBy := time.Hour * 24
225	if delayBy > maxDelayBy {
226		delayBy = maxDelayBy
227	}
228	log.Warn("msg", fmt.Sprintf("Table %s was compressed, decompressing", table), "table", table, "min-time", minTime, "age", time.Since(minTime), "delay-job-by", delayBy)
229
230	_, rescheduleErr := conn.Exec(context.Background(), "SELECT "+schema.Catalog+".delay_compression_job($1, $2)",
231		table, time.Now().Add(delayBy))
232	if rescheduleErr != nil {
233		log.Error("msg", rescheduleErr, "context", "Rescheduling compression")
234		return rescheduleErr
235	}
236
237	_, decompressErr := conn.Exec(context.Background(), "CALL "+schema.Catalog+".decompress_chunks_after($1, $2);", table, minTime)
238	if decompressErr != nil {
239		log.Error("msg", decompressErr, "context", "Decompressing chunks")
240		return decompressErr
241	}
242
243	metrics.DecompressCalls.Inc()
244	metrics.DecompressEarliest.WithLabelValues(table).Set(float64(minTime.UnixNano()) / 1e9)
245	err, _ := insertSeries(conn, req) // Attempt an insert again.
246	return err
247}
248
249/*
250Useful output for debugging batching issues
251func debugInsert() {
252	m := &dto.Metric{}
253	dbBatchInsertDuration.Write(m)
254	durs := time.Duration(*m.Histogram.SampleSum * float64(time.Second))
255	cnt := *m.Histogram.SampleCount
256	numRowsPerBatch.Write(m)
257	rows := *m.Histogram.SampleSum
258	numInsertsPerBatch.Write(m)
259	inserts := *m.Histogram.SampleSum
260
261	fmt.Println("avg:  duration/row", durs/time.Duration(rows), "rows/batch", uint64(rows)/cnt, "inserts/batch", uint64(inserts)/cnt, "rows/insert", rows/inserts)
262}
263*/
264
265// insertSeries performs the insertion of time-series into the DB.
266func insertSeries(conn pgxconn.PgxConn, reqs ...copyRequest) (error, int64) {
267	batch := conn.NewBatch()
268
269	numRowsPerInsert := make([]int, 0, len(reqs))
270	numRowsTotal := 0
271	totalSamples := 0
272	totalExemplars := 0
273	lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
274	lowestMinTime := int64(math.MaxInt64)
275	for r := range reqs {
276		req := &reqs[r]
277		numSamples, numExemplars := req.data.batch.Count()
278		NumRowsPerInsert.Observe(float64(numSamples + numExemplars))
279
280		// flatten the various series into arrays.
281		// there are four main bottlenecks for insertion:
282		//   1. The round trip time.
283		//   2. The number of requests sent.
284		//   3. The number of individual INSERT statements.
285		//   4. The amount of data sent.
286		// While the first two of these can be handled by batching, for the latter
287		// two we need to actually reduce the work done. It turns out that simply
288		// collecting all the data into a postgres array and performing a single
289		// INSERT using that overcomes most of the performance issues for sending
290		// multiple data, and brings INSERT nearly on par with CopyFrom. In the
291		// future we may wish to send compressed data instead.
292		var (
293			hasSamples   bool
294			hasExemplars bool
295
296			timeSamples   []time.Time
297			timeExemplars []time.Time
298
299			valSamples   []float64
300			valExemplars []float64
301
302			seriesIdSamples   []int64
303			seriesIdExemplars []int64
304
305			exemplarLbls [][]string
306		)
307
308		if numSamples > 0 {
309			timeSamples = make([]time.Time, 0, numSamples)
310			valSamples = make([]float64, 0, numSamples)
311			seriesIdSamples = make([]int64, 0, numSamples)
312		}
313		if numExemplars > 0 {
314			timeExemplars = make([]time.Time, 0, numExemplars)
315			valExemplars = make([]float64, 0, numExemplars)
316			seriesIdExemplars = make([]int64, 0, numExemplars)
317			exemplarLbls = make([][]string, 0, numExemplars)
318		}
319
320		visitor := req.data.batch.Visitor()
321		err := visitor.Visit(
322			func(t time.Time, v float64, seriesId int64) {
323				hasSamples = true
324				timeSamples = append(timeSamples, t)
325				valSamples = append(valSamples, v)
326				seriesIdSamples = append(seriesIdSamples, seriesId)
327			},
328			func(t time.Time, v float64, seriesId int64, lvalues []string) {
329				hasExemplars = true
330				timeExemplars = append(timeExemplars, t)
331				valExemplars = append(valExemplars, v)
332				seriesIdExemplars = append(seriesIdExemplars, seriesId)
333				exemplarLbls = append(exemplarLbls, lvalues)
334			},
335		)
336		if err != nil {
337			return err, lowestMinTime
338		}
339		epoch := visitor.LowestEpoch()
340		if epoch < lowestEpoch {
341			lowestEpoch = epoch
342		}
343		minTime := visitor.MinTime()
344		if minTime < lowestMinTime {
345			lowestMinTime = minTime
346		}
347
348		numRowsTotal += numSamples + numExemplars
349		totalSamples += numSamples
350		totalExemplars += numExemplars
351		if hasSamples {
352			numRowsPerInsert = append(numRowsPerInsert, numSamples)
353			batch.Queue("SELECT "+schema.Catalog+".insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.table, timeSamples, valSamples, seriesIdSamples)
354		}
355		if hasExemplars {
356			// We cannot send 2-D [][]TEXT to postgres via the pgx.encoder. For this and easier querying reasons, we create a
357			// new type in postgres by the name SCHEMA_PROM.label_value_array and use that type as array (which forms a 2D array of TEXT)
358			// which is then used to push using the unnest method apprach.
359			labelValues := pgmodel.GetCustomType(pgmodel.LabelValueArray)
360			if err := labelValues.Set(exemplarLbls); err != nil {
361				return fmt.Errorf("setting prom_api.label_value_array[] value: %w", err), lowestMinTime
362			}
363			numRowsPerInsert = append(numRowsPerInsert, numExemplars)
364			batch.Queue("SELECT "+schema.Catalog+".insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::"+schema.Prom+".label_value_array[], $5::DOUBLE PRECISION[])", req.table, timeExemplars, seriesIdExemplars, labelValues, valExemplars)
365		}
366	}
367
368	//note the epoch increment takes an access exclusive on the table before incrementing.
369	//thus we don't need row locking here. Note by doing this check at the end we can
370	//have some wasted work for the inserts before this fails but this is rare.
371	//avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it.
372	epochCheck := fmt.Sprintf("SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN %s.epoch_abort($1) END FROM %s.ids_epoch LIMIT 1", schema.Catalog, schema.Catalog)
373	batch.Queue(epochCheck, int64(lowestEpoch))
374
375	NumRowsPerBatch.Observe(float64(numRowsTotal))
376	NumInsertsPerBatch.Observe(float64(len(reqs)))
377	start := time.Now()
378	results, err := conn.SendBatch(context.Background(), batch)
379	if err != nil {
380		return err, lowestMinTime
381	}
382	defer results.Close()
383
384	var affectedMetrics uint64
385	for _, numRows := range numRowsPerInsert {
386		var insertedRows int64
387		err := results.QueryRow().Scan(&insertedRows)
388		if err != nil {
389			return err, lowestMinTime
390		}
391		numRowsExpected := int64(numRows)
392		if numRowsExpected != insertedRows {
393			affectedMetrics++
394			registerDuplicates(numRowsExpected - insertedRows)
395		}
396	}
397	numSamplesInserted.Add(float64(totalSamples))
398	numExemplarsInserted.Add(float64(totalExemplars))
399
400	var val []byte
401	row := results.QueryRow()
402	err = row.Scan(&val)
403	if err != nil {
404		return err, lowestMinTime
405	}
406	reportDuplicates(affectedMetrics)
407	DbBatchInsertDuration.Observe(time.Since(start).Seconds())
408	return nil, lowestMinTime
409}
410
411func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows uint64, err error) {
412	numRows := len(reqs)
413	timeSlice := make([]time.Time, numRows)
414	metricFamilies := make([]string, numRows)
415	units := make([]string, numRows)
416	types := make([]string, numRows)
417	helps := make([]string, numRows)
418	n := time.Now()
419	for i := range reqs {
420		timeSlice[i] = n
421		metricFamilies[i] = reqs[i].MetricFamily
422		units[i] = reqs[i].Unit
423		types[i] = reqs[i].Type
424		helps[i] = reqs[i].Help
425	}
426	start := time.Now()
427	row := conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".insert_metric_metadatas($1::TIMESTAMPTZ[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[])",
428		timeSlice, metricFamilies, types, units, helps)
429	if err := row.Scan(&insertedRows); err != nil {
430		return 0, fmt.Errorf("send metadata batch: %w", err)
431	}
432	MetadataBatchInsertDuration.Observe(time.Since(start).Seconds())
433	return insertedRows, nil
434}
435